In [1]:
import configparser
import pyspark.sql.functions as F
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, count, when, col, desc, udf, col, sort_array, asc, avg
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType
from datetime import datetime
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, LongType as LInt, TimestampType, FloatType as Flt


In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['KEYS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['KEYS']['AWS_SECRET_ACCESS_KEY']

In [26]:
# test reading udacity's s3
input_data = "s3a://udacity-dend/"

In [None]:
# reading local dir
# input_data = "./data/"


In [None]:
# saving local dir
output_data = "./data/output/"

In [3]:
spark = SparkSession \
    .builder \
    .appName("Data Frames practice") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()
spark.sparkContext._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.algorithm.version", "2")

In [None]:
# log_data/2018/11/2018-11-12-events.json
# log_data/2018/11/2018-11-13-events.json

In [27]:
log_data = os.path.join(input_data, "log_data/*/*/*.json")
log_data

's3a://udacity-dend/log_data/*/*/*.json'

In [28]:
dfLogData = spark.read.json(log_data)
dfLogData.createOrReplaceTempView("log_table")

In [29]:
dfLogData.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [30]:
dfLogData.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,12


In [31]:
dfLogData.count()

8056

In [32]:
dfLogData = dfLogData.withColumn('user_id', dfLogData.userId.cast(Int()))
dfLogData = dfLogData.where(dfLogData.page == 'NextSong')

In [33]:
dfLogData.dropDuplicates()
dfLogData.count()

6820

In [34]:
users_table = dfLogData.selectExpr(['user_id', 'firstName as first_name', 'lastName as last_name', 'gender', 'level', 'ts'])

In [35]:
users_table.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)
 |-- ts: long (nullable = true)



In [36]:
users_window = Window.partitionBy('user_id').orderBy(F.desc('ts'))

In [37]:
users_table = users_table.withColumn('row_number', F.row_number().over(users_window))

In [38]:
users_table.where((users_table.user_id == 9) | (users_table.user_id == 65)).limit(10).toPandas()

Unnamed: 0,user_id,first_name,last_name,gender,level,ts,row_number
0,65,Amiya,Davidson,F,paid,1542779804796,1
1,65,Amiya,Davidson,F,paid,1542779604796,2
2,65,Amiya,Davidson,F,paid,1542779150796,3
3,65,Amiya,Davidson,F,paid,1542778881796,4
4,65,Amiya,Davidson,F,paid,1542778644796,5
5,65,Amiya,Davidson,F,paid,1542778408796,6
6,65,Amiya,Davidson,F,paid,1542778194796,7
7,65,Amiya,Davidson,F,paid,1542778005796,8
8,65,Amiya,Davidson,F,paid,1542777793796,9
9,65,Amiya,Davidson,F,paid,1542777758796,10


In [39]:
users_table = users_table.where(users_table.row_number == 1).drop('ts', 'row_number') 

In [40]:
users_table.limit(5).toPandas()

Unnamed: 0,user_id,first_name,last_name,gender,level
0,85,Kinsley,Young,F,paid
1,65,Amiya,Davidson,F,paid
2,53,Celeste,Williams,F,free
3,78,Chloe,Roth,F,free
4,34,Evelin,Ayala,F,free


In [None]:
users_table.write.mode("overwrite").parquet(os.path.join(output_data, 'users'))

In [None]:
users_table.limit(5).toPandas()

In [None]:
get_timestamp = udf(lambda ts: datetime.fromtimestamp(ts/1000).isoformat())
dfLogData = dfLogData.withColumn('start_time', get_timestamp('ts').cast(TimestampType()))

In [None]:
dfLogData.printSchema()

In [None]:
dfLogData.limit(5).toPandas()

In [None]:
time_table = dfLogData.select('start_time').dropDuplicates() \
    .withColumn('hour', F.hour('start_time')) \
    .withColumn('day', F.dayofmonth('start_time')) \
    .withColumn('week', F.weekofyear('start_time')) \
    .withColumn('month', F.month('start_time')) \
    .withColumn('year', F.year('start_time')) \
    .withColumn('weekday', F.dayofweek('start_time'))

In [None]:
time_table.printSchema()

In [None]:
time_table.limit(5).toPandas()

In [None]:
time_table.write.mode("overwrite").parquet(os.path.join(output_data, 'time'), partitionBy=['year', 'month'])


In [None]:
song_df = spark.sql(
    'SELECT DISTINCT song_id, title, artist_id, artist_name,duration FROM song_df_table'
)

In [None]:
song_df.limit(5).toPandas()

In [None]:
dfLogData.limit(5).toPandas()

In [None]:
# extract columns from joined song and log datasets to create songplays table 

songplays_table = dfLogData.join(song_df, (dfLogData.song == song_df.title) & (dfLogData.artist == song_df.artist_name) & (dfLogData.length == song_df.duration), 'left_outer') \
    .distinct() \
    .select(monotonically_increasing_id().alias("songplay_id"),
             col("start_time"),
             col("user_id"),
             col("level"),
             col("song_id"),
             col("artist_id"),
             col("sessionId").alias('session_id'),
             col("location"),
             col("userAgent").alias("user_agent")
    ).withColumn("month", F.month(col("start_time"))) \
     .withColumn("year", F.year(col("start_time")))

In [None]:
songplays_table.limit(5).toPandas()

In [None]:
songplays_table.write.mode("overwrite").parquet(os.path.join(output_data, 'songplays'), partitionBy=['year', 'month'])

In [None]:
# songplays_table = dfLogData.join(song_df, (dfLogData.song == song_df.title), 'left_outer').distinct().select(monotonically_increasing_id().alias("songplay_id"), col("start_time"), col("user_id"), col("userAgent").alias("user_agent")).withColumn("month", F.month(col("start_time"))).withColumn("year", F.year(col("start_time")))

In [14]:
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, LongType as LInt, TimestampType, FloatType as Flt
songSchema = R([
    Fld("artist_id", Str()),
    Fld("artist_latitude", Dbl()),
    Fld("artist_location", Str()),
    Fld("artist_longitude", Dbl()),
    Fld("artist_name", Str()),
    Fld("duration", Flt()),
    Fld("num_songs", Int()),
    Fld("song_id", Str()),
    Fld("title", Str()),
    Fld("year", Int())
])

In [15]:
# song_data/A/B/C/TRABCEI128F424C983.json
# song_data/A/A/B/TRAABJL12903CDCF1A.json

In [16]:
song_data = "./data/song-data/*/*/*/*.json"
song_data

'./data/song-data/*/*/*/*.json'

In [17]:
dfSongData = spark.read.json(song_data, schema=songSchema, multiLine=True, mode='PERMISSIVE', columnNameOfCorruptRecord='corrupt_record')

In [18]:
dfSongData.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: float (nullable = true)
 |-- num_songs: integer (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)



In [19]:
dfSongData.limit(5).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARDR4AC1187FB371A1,,,,Montserrat Caballé;Placido Domingo;Vicente Sar...,511.163635,1,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,0
1,AREBBGV1187FB523D2,,"Houston, TX",,Mike Jones (Featuring CJ_ Mello & Lil' Bran),173.661591,1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),0
2,ARMAC4T1187FB3FA4C,40.82624,"Morris Plains, NJ",-74.47995,The Dillinger Escape Plan,207.777512,1,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,2004
3,ARPBNLO1187FB3D52F,40.71455,"New York, NY",-74.00712,Tiny Tim,43.362808,1,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,2000
4,ARDNS031187B9924F0,32.67828,Georgia,-83.22295,Tim Wilson,186.487717,1,SONYPOM12A8C13B2D7,I Think My Wife Is Running Around On Me (Taco ...,2005


In [20]:
dfSongData = dfSongData.dropDuplicates()

In [21]:
dfSongData.count()

71

In [22]:
songs_table = songs_table.select(['song_id', 'title', 'artist_id', 'year', 'duration'])

In [23]:
songs_table = songs_table.dropDuplicates()

In [24]:
songs_table.count()

11

In [12]:
# input_data = "./data/"

In [13]:
# test s3 output bucket
output_data = "s3a://my-datalake-bucket-test/"
songs_table.write.mode("overwrite").parquet(os.path.join(output_data, 'songs'), partitionBy=['year', 'artist_id'])

In [None]:
# songs_table.write.mode("overwrite").parquet(os.path.join(output_data, 'songs'), partitionBy=['year', 'artist_id'])

In [None]:
fields = ['artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']
exprs = [col + ' as ' + col.replace('artist_', '') for col in fields]
exprs

In [None]:
artist_table = dfSongData.selectExpr('artist_id', *exprs)
artist_table.limit(5).toPandas()

In [None]:
artist_table.write.mode("overwrite").parquet(os.path.join(output_data, 'artists'))

In [None]:
dfSongData.createOrReplaceTempView('song_df_table')