## SET UP

In [1]:
!pip install findspark

import findspark
findspark.init()

Collecting findspark
  Downloading https://files.pythonhosted.org/packages/fc/2d/2e39f9a023479ea798eed4351cd66f163ce61e00c717e03c37109f00c0f2/findspark-1.4.2-py2.py3-none-any.whl
Installing collected packages: findspark
Successfully installed findspark-1.4.2


In [2]:
# Cargar Pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("Sparkify").master("local[*]").getOrCreate()

spark

In [3]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType, TimestampType, LongType 

## Process song data

In [4]:
# get filepath to song data file
song_data = "input/song_data/*/*/*/*.json"

In [5]:
song_schema = StructType([
    StructField("artist_id", StringType()),
    StructField("artist_latitude", DoubleType()),
    StructField("artist_location", StringType()),
    StructField("artist_longitude", StringType()),
    StructField("artist_name", StringType()),
    StructField("duration", DoubleType()),
    StructField("num_songs", IntegerType()),
    StructField("song_id", IntegerType()),
    StructField("title", StringType()),
    StructField("year", IntegerType()),
])

In [6]:
# read song data file
df = spark.read.json(song_data,schema=song_schema)

In [7]:
df.columns

['artist_id',
 'artist_latitude',
 'artist_location',
 'artist_longitude',
 'artist_name',
 'duration',
 'num_songs',
 'song_id',
 'title',
 'year']

In [8]:
# extract columns to create songs table
songs_table = (df
               .select(col("song_id")
                       ,col("title")
                       ,col("artist_id")
                       ,col("year")
                       ,col("duration"))
               .dropDuplicates()
              )

In [9]:
 # write songs table to parquet files partitioned by year and artist
songs_table.write.partitionBy("year","artist_id").parquet("output/songs_table.parquet/" , mode="overwrite")

In [10]:
songs_table.show(5)

+-------+-----+---------+----+--------+
|song_id|title|artist_id|year|duration|
+-------+-----+---------+----+--------+
|   null| null|     null|null|    null|
+-------+-----+---------+----+--------+



In [11]:
# extract columns to create artists table
artists_table = (df
               .select(col("artist_id")
                       ,col("artist_name")
                       ,col("artist_location")
                       ,col("artist_latitude")
                       ,col("artist_longitude"))
               .dropDuplicates()
              ) 

In [12]:
# write artists table to parquet files
artists_table.write.parquet("output/artists_table.parquet/" , mode="overwrite")

In [13]:
artists_table.show(5)

+------------------+--------------------+---------------+---------------+----------------+
|         artist_id|         artist_name|artist_location|artist_latitude|artist_longitude|
+------------------+--------------------+---------------+---------------+----------------+
|ARI3BMM1187FB4255E|        Alice Stuart|     Washington|        38.8991|         -77.029|
|ARWB3G61187FB49404|         Steve Morse| Hamilton, Ohio|           null|            null|
|ARKULSX1187FB45F84|              Trafik|           Utah|       39.49974|      -111.54732|
|ARHHO3O1187B989413|           Bob Azzam|               |           null|            null|
|ARAGB2O1187FB3A161|Pucho & His Latin...|               |           null|            null|
+------------------+--------------------+---------------+---------------+----------------+
only showing top 5 rows



## Process log data

In [14]:
# get filepath to log data file
log_data = "input/log-data/*.json"

In [15]:
log_schema = StructType([
    StructField("artist", StringType()),
    StructField("auth", DoubleType()),
    StructField("first_name", StringType()),
    StructField("gender", StringType()),
    StructField("item_in_session", LongType()),
    StructField("last_name", StringType()),
    StructField("length", DoubleType()),
    StructField("level", StringType()),
    StructField("location", StringType()),
    StructField("method", StringType()),
    StructField("page", StringType()),
    StructField("registration", DoubleType()),
    StructField("session_id", LongType()),
    StructField("song", StringType()),
    StructField("status", LongType()),
    StructField("ts", LongType()),
    StructField("user_agent", StringType()),
    StructField("user_id", StringType())
])

In [16]:
# read log data file
df = spark.read.json(log_data,schema= log_schema)

In [17]:
df.columns

['artist',
 'auth',
 'first_name',
 'gender',
 'item_in_session',
 'last_name',
 'length',
 'level',
 'location',
 'method',
 'page',
 'registration',
 'session_id',
 'song',
 'status',
 'ts',
 'user_agent',
 'user_id']

In [18]:
types = [f.dataType for f in df.schema.fields]
types

[StringType,
 DoubleType,
 StringType,
 StringType,
 LongType,
 StringType,
 DoubleType,
 StringType,
 StringType,
 StringType,
 StringType,
 DoubleType,
 LongType,
 StringType,
 LongType,
 LongType,
 StringType,
 StringType]

In [19]:
# filter by actions for song plays
df = df.filter("page == 'NextSong'")

In [20]:
# extract columns for users table    
users_table = (df.select(col("user_id")
                         ,col("first_name")
                         ,col("last_name")
                         ,col("gender")
                         ,col("level")
                        )
               .dropDuplicates()
              )

In [21]:
users_table.write.parquet("output/users_table.parquet/" , mode="overwrite") 

In [22]:
# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: datetime.fromtimestamp(x/1000).strftime('%Y-%m-%d %H:%M:%S'))
df = df.withColumn('timestamp', get_timestamp('ts'))

In [23]:
# create datetime column from original timestamp column
get_datetime = udf(lambda x: datetime.fromtimestamp(x/1000).strftime('%Y-%m-%d'))
df = df.withColumn('datetime', get_datetime('ts'))
df = (df
      .withColumnRenamed("ts","start_time")
      .withColumn('hour',hour("timestamp"))
      .withColumn('day',dayofmonth("datetime"))
      .withColumn('week',weekofyear("datetime"))
      .withColumn('month',month("datetime"))
      .withColumn('year',year("datetime"))
      .withColumn('weekday',dayofweek("datetime"))
     )

In [24]:
# extract columns to create time table
time_table = (df.select(col("start_time")
                      ,col("hour")
                      ,col("day")
                      ,col("week")
                      ,col("month")
                      ,col("year")
                      ,col("weekday")
                      )
              .dropDuplicates()
             )

In [25]:
time_table.show(5)

+-------------+----+---+----+-----+----+-------+
|   start_time|hour|day|week|month|year|weekday|
+-------------+----+---+----+-----+----+-------+
|1542795222796|  10| 21|  47|   11|2018|      4|
|1542799276796|  11| 21|  47|   11|2018|      4|
|1542195401796|  11| 14|  46|   11|2018|      4|
|1543441920796|  21| 28|  48|   11|2018|      4|
|1541407889796|   8|  5|  45|   11|2018|      2|
+-------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [26]:
# write time table to parquet files partitioned by year and month
time_table.write.partitionBy("year","month").parquet("output/time_table.parquet/",mode="overwrite")

In [27]:
song_data = "input/song_data/*/*/*/*.json"

In [28]:
song_schema = StructType([
    StructField("artist_id", StringType()),
    StructField("artist_latitude", DoubleType()),
    StructField("artist_location", StringType()),
    StructField("artist_longitude", StringType()),
    StructField("artist_name", StringType()),
    StructField("duration", DoubleType()),
    StructField("num_songs", IntegerType()),
    StructField("song_id", IntegerType()),
    StructField("title", StringType()),
    StructField("year", IntegerType()),
])

In [29]:
# read in song data to use for songplays table
song_df = spark.read.json(song_data,schema=song_schema).drop("year")

In [30]:
# extract columns from joined song and log datasets to create songplays table 
songplays_table = (df.join(song_df, df["song"] == song_df["title"], how='inner')
                   .select(monotonically_increasing_id().alias("songplay_id")
                           ,col("start_time")
                           ,col("user_id")
                           ,col("level")
                           ,col("song_id")
                           ,col("artist_id")
                           ,col("session_id")
                           ,col("location")
                           ,col("user_agent")
                           ,col("year")
                           ,col("month")
                          )
                  )

In [31]:
songplays_table.columns

['songplay_id',
 'start_time',
 'user_id',
 'level',
 'song_id',
 'artist_id',
 'session_id',
 'location',
 'user_agent',
 'year',
 'month']

In [32]:
# write songplays table to parquet files partitioned by year and month
songplays_table.write.partitionBy("year","month").parquet("output/songplays_table.parquet/",mode="overwrite")