In [42]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType, DateType, TimestampType

In [7]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

spark = create_spark_session()

## Ler dados com schema definido

In [79]:
# TODO mudar pra pegar do S3
song_data = '/home/miguel/udacity/project_4/data/song_data'
song_data_schema = StructType([
    StructField('artist_id',StringType()),
    StructField('artist_latitude',DoubleType()),
    StructField('artist_location',StringType()),
    StructField('artist_longitude',DoubleType()),
    StructField('artist_name',StringType()),
    StructField('duration',DoubleType()),
    StructField('num_songs',IntegerType()),
    StructField('song_id',StringType()),
    StructField('title',StringType()),
    StructField('year',IntegerType()),
])
df = spark.read \
        .option('recursiveFileLookup','true') \
        .json(song_data,schema=song_data_schema)
df.printSchema()
df.limit(5).toPandas()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: integer (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)



Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARDR4AC1187FB371A1,,,,Montserrat Caballé;Placido Domingo;Vicente Sar...,511.16363,1,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,0
1,AREBBGV1187FB523D2,,"Houston, TX",,Mike Jones (Featuring CJ_ Mello & Lil' Bran),173.66159,1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),0
2,ARMAC4T1187FB3FA4C,40.82624,"Morris Plains, NJ",-74.47995,The Dillinger Escape Plan,207.77751,1,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,2004
3,ARPBNLO1187FB3D52F,40.71455,"New York, NY",-74.00712,Tiny Tim,43.36281,1,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,2000
4,ARNF6401187FB57032,40.79086,"New York, NY [Manhattan]",-73.96644,Sophie B. Hawkins,305.162,1,SONWXQJ12A8C134D94,The Ballad Of Sleeping Beauty,1994


## Separar colunas para tabela songs

In [20]:
# song_id, title, artist_id, year, duration
songs_table = df.select(['song_id','title','artist_id','year','duration'])
songs_table.limit(5).toPandas()

Unnamed: 0,song_id,title,artist_id,year,duration
0,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,ARDR4AC1187FB371A1,0,511.16363
1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),AREBBGV1187FB523D2,0,173.66159
2,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,ARMAC4T1187FB3FA4C,2004,207.77751
3,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,ARPBNLO1187FB3D52F,2000,43.36281
4,SONWXQJ12A8C134D94,The Ballad Of Sleeping Beauty,ARNF6401187FB57032,1994,305.162


In [21]:
song_data_output = '/home/miguel/udacity/project_4/data/output_data/song_data/songs'
songs_table.write \
        .mode('overwrite') \
        .partitionBy('year','artist_id') \
        .format('parquet') \
        .option('path',song_data_output) \
        .save()

## Separar colunas para tabela artists

In [80]:
# artist_id, name, location, lattitude, longitude
artists_table = df.select(['artist_id','artist_name','artist_location','artist_latitude','artist_longitude']) \
    .withColumnRenamed('artist_name','name') \
    .withColumnRenamed('artist_location','location') \
    .withColumnRenamed('artist_latitude','latitude') \
    .withColumnRenamed('artist_longitude','longitude')
artists_table.limit(5).toPandas()

Unnamed: 0,artist_id,name,location,latitude,longitude
0,ARDR4AC1187FB371A1,Montserrat Caballé;Placido Domingo;Vicente Sar...,,,
1,AREBBGV1187FB523D2,Mike Jones (Featuring CJ_ Mello & Lil' Bran),"Houston, TX",,
2,ARMAC4T1187FB3FA4C,The Dillinger Escape Plan,"Morris Plains, NJ",40.82624,-74.47995
3,ARPBNLO1187FB3D52F,Tiny Tim,"New York, NY",40.71455,-74.00712
4,ARNF6401187FB57032,Sophie B. Hawkins,"New York, NY [Manhattan]",40.79086,-73.96644


In [None]:
artist_data_output = '/home/miguel/udacity/project_4/data/output_data/song_data/artists'
artists_table.write \
        .mode('overwrite') \
        .partitionBy('artist_id') \
        .format('parquet') \
        .option('path',song_data_output) \
        .save()

## log data

In [81]:
# TODO mudar pra pegar do S3
log_data = '/home/miguel/udacity/project_4/data/log_data'
log_data_schema = StructType([
    StructField('artist',StringType()),
    StructField('auth',StringType()),
    StructField('firstName',StringType()),
    StructField('gender',StringType()),
    StructField('itemInSession',IntegerType()),
    StructField('lastName',StringType()),
    StructField('length',DoubleType()),
    StructField('level',StringType()),
    StructField('location',StringType()),
    StructField('method',StringType()),
    StructField('page',StringType()),
    StructField('registration',StringType()),
    StructField('sessionId',IntegerType()),
    StructField('song',StringType()),
    StructField('status',IntegerType()),
    StructField('ts',StringType()),
    StructField('userAgent',StringType()),
    StructField('userId',StringType()),
])
df = spark.read \
        .option('recursiveFileLookup','true') \
        .json(log_data,schema=log_data_schema)
df.printSchema()
df.limit(3).toPandas()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: integer (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: string (nullable = true)
 |-- sessionId: integer (nullable = true)
 |-- song: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- ts: string (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541016707796.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541016707796.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541016707796.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26


## Converte coluna ts em data e timestamp

In [67]:
from datetime import datetime
@udf(TimestampType())
def parseTimestamp(line):
    if line == None:
        return None
    line = int(line)
    return datetime.fromtimestamp(line/1000)

@udf(DateType())
def parseDate(line):
    if line == None:
        return None
    line = int(line)
    return datetime.fromtimestamp(line/1000)

In [82]:
df = df \
        .withColumn('timestamp',parseTimestamp('ts')) \
        .withColumn('datetime',parseDate('ts'))
df.printSchema()
df.limit(3).toPandas()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: integer (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: string (nullable = true)
 |-- sessionId: integer (nullable = true)
 |-- song: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- ts: string (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- datetime: date (nullable = true)



Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,timestamp,datetime
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541016707796.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-14 22:30:26.796,2018-11-14
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541016707796.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-14 22:41:21.796,2018-11-14
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541016707796.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-14 22:45:41.796,2018-11-14


## Filtrar linhas para songplays

In [83]:
df = df.where(df.page == 'NextSong')
df.limit(3).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,timestamp,datetime
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541016707796.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-14 22:30:26.796,2018-11-14
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541016707796.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-14 22:41:21.796,2018-11-14
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541016707796.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-14 22:45:41.796,2018-11-14


## Separar colunas para tabela users

In [88]:
# user_id, first_name, last_name, gender, level
users_table = df.select(['userId','firstName','lastName','gender','level']) \
    .withColumnRenamed('userId','user_id') \
    .withColumnRenamed('firstName','first_name') \
    .withColumnRenamed('lastName','last_name')
users_table.distinct().limit(5).toPandas()

Unnamed: 0,user_id,first_name,last_name,gender,level
0,26,Ryan,Smith,M,free
1,7,Adelyn,Jordan,F,free
2,71,Ayleen,Wise,F,free
3,81,Sienna,Colon,F,free
4,87,Dustin,Lee,M,free


## Separar colunas para tabela time

In [107]:
# start_time, hour, day, week, month, year, weekday
time_table = df.withColumn('month',F.month('datetime')) \
    .withColumn('day',F.dayofmonth('datetime')) \
    .withColumn('year',F.year('datetime')) \
    .withColumn('weekday',F.dayofweek('datetime')) \
    .withColumn('start_time',F.unix_timestamp('timestamp')) \
    .withColumn('week',F.weekofyear('datetime')) \
    .withColumn('hour',F.hour('timestamp')) \
    .select(['timestamp','start_time','hour','day','week','month','year','weekday'])
time_table.limit(3).toPandas()

Unnamed: 0,timestamp,start_time,hour,day,week,month,year,weekday
0,2018-11-14 22:30:26.796,1542241826,22,14,46,11,2018,4
1,2018-11-14 22:41:21.796,1542242481,22,14,46,11,2018,4
2,2018-11-14 22:45:41.796,1542242741,22,14,46,11,2018,4


## tabela songplays

In [108]:
song_data = '/home/miguel/udacity/project_4/data/song_data'
song_data_schema = StructType([
    StructField('artist_id',StringType()),
    StructField('artist_latitude',DoubleType()),
    StructField('artist_location',StringType()),
    StructField('artist_longitude',DoubleType()),
    StructField('artist_name',StringType()),
    StructField('duration',DoubleType()),
    StructField('num_songs',IntegerType()),
    StructField('song_id',StringType()),
    StructField('title',StringType()),
    StructField('year',IntegerType()),
])
song_df = spark.read \
        .option('recursiveFileLookup','true') \
        .json(song_data, schema=song_data_schema)
song_df.limit(5).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARDR4AC1187FB371A1,,,,Montserrat Caballé;Placido Domingo;Vicente Sar...,511.16363,1,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,0
1,AREBBGV1187FB523D2,,"Houston, TX",,Mike Jones (Featuring CJ_ Mello & Lil' Bran),173.66159,1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),0
2,ARMAC4T1187FB3FA4C,40.82624,"Morris Plains, NJ",-74.47995,The Dillinger Escape Plan,207.77751,1,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,2004
3,ARPBNLO1187FB3D52F,40.71455,"New York, NY",-74.00712,Tiny Tim,43.36281,1,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,2000
4,ARNF6401187FB57032,40.79086,"New York, NY [Manhattan]",-73.96644,Sophie B. Hawkins,305.162,1,SONWXQJ12A8C134D94,The Ballad Of Sleeping Beauty,1994


## join song and log data

In [115]:
df.limit(2).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,timestamp,datetime
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541016707796.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-14 22:30:26.796,2018-11-14
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541016707796.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-14 22:41:21.796,2018-11-14


In [113]:
song_df.limit(2).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARDR4AC1187FB371A1,,,,Montserrat Caballé;Placido Domingo;Vicente Sar...,511.16363,1,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,0
1,AREBBGV1187FB523D2,,"Houston, TX",,Mike Jones (Featuring CJ_ Mello & Lil' Bran),173.66159,1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),0


In [131]:
joined_df = df.join(song_df, (song_df.artist_name == df.artist) & (song_df.title == df.song))
joined_df.printSchema()
joined_df.limit(2).toPandas()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: integer (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: string (nullable = true)
 |-- sessionId: integer (nullable = true)
 |-- song: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- ts: string (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- datetime: date (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nu

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,...,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,Elena,Logged In,Lily,F,5,Koch,269.58322,paid,"Chicago-Naperville-Elgin, IL-IN-WI",PUT,...,AR5KOSW1187FB35FF4,49.80388,Dubai UAE,15.47491,Elena,269.58322,1,SOZCTXZ12AB0182364,Setanta matins,0


In [133]:
# songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
songplays_table = joined_df.withColumn('start_time',F.unix_timestamp('timestamp')) \
    .withColumn("songplay_id", F.monotonically_increasing_id()) \
    .select(['songplay_id','start_time','userId','level','song_id','artist_id','sessionId','location','userAgent']) \
    .withColumnRenamed('userId','user_id') \
    .withColumnRenamed('sessionId','session_id') \
    .withColumnRenamed('userAgent','user_agent')
songplays_table.limit(3).toPandas()

Unnamed: 0,songplay_id,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent
0,0,1542837407,15,paid,SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4,818,"Chicago-Naperville-Elgin, IL-IN-WI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."


## testar leitura do bucket S3

In [136]:
input_data = 's3a://udacity-dend/'
song_data_schema = StructType([
    StructField('artist_id',StringType()),
    StructField('artist_latitude',DoubleType()),
    StructField('artist_location',StringType()),
    StructField('artist_longitude',DoubleType()),
    StructField('artist_name',StringType()),
    StructField('duration',DoubleType()),
    StructField('num_songs',IntegerType()),
    StructField('song_id',StringType()),
    StructField('title',StringType()),
    StructField('year',IntegerType()),
])
log_data_schema = StructType([
    StructField('artist',StringType()),
    StructField('auth',StringType()),
    StructField('firstName',StringType()),
    StructField('gender',StringType()),
    StructField('itemInSession',IntegerType()),
    StructField('lastName',StringType()),
    StructField('length',DoubleType()),
    StructField('level',StringType()),
    StructField('location',StringType()),
    StructField('method',StringType()),
    StructField('page',StringType()),
    StructField('registration',StringType()),
    StructField('sessionId',IntegerType()),
    StructField('song',StringType()),
    StructField('status',IntegerType()),
    StructField('ts',StringType()),
    StructField('userAgent',StringType()),
    StructField('userId',StringType()),
])
#os.environ['AWS_ACCESS_KEY_ID']='AKIATNOOETVOBME7EJMQ'
#os.environ['AWS_SECRET_ACCESS_KEY']='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'

In [137]:
df = spark.read \
        .option('recursiveFileLookup','true') \
        .json(input_data, schema=song_data_schema)
df.limit(5).toPandas()

Py4JJavaError: An error occurred while calling o1804.json.
: com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
	at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521)
	at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
	at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
	at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:46)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:366)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:297)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:286)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:286)
	at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:477)
	at jdk.internal.reflect.GeneratedMethodAccessor84.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:567)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:830)
