## Testing notebook to explore cleaning the data

In [181]:
import etl
import zipfile
import os
from functools import reduce
from pyspark.sql.functions import udf, year, month, dayofmonth, weekofyear, hour, from_unixtime,\
                                    col, when, regexp_extract, row_number, lit, desc
from pyspark.sql.window import Window
from datetime import datetime

import pandas as pd

In [181]:
pd.set_option('display.max_colwidth', 200)

In [181]:
spark = etl.create_spark_session()

In [186]:
output_dir = "./output"
unzipped_logs_dir = 'log_data'
unzipped_songs_dir = 'song_data'

In [248]:
import shutil

if os.path.isdir(unzipped_logs_dir):
    shutil.rmtree(unzipped_logs_dir)
if os.path.isdir(unzipped_songs_dir):
    shutil.rmtree(unzipped_songs_dir)

In [191]:
from zipfile import ZipFile


with ZipFile('data/log-data.zip', 'r') as z:
    z.extractall(unzipped_logs_dir)

with ZipFile('data/song-data.zip', 'r') as z:
    z.extractall('.')


In [192]:

log_data = spark.read.json('log_data/*.json')

In [None]:
# import pyspark.sql.functions as f
# from functools import reduce

# log_data.where(reduce(lambda x, y: x | y, (f.col(x).rlike(r'^[\'\"]{2}$') for x in log_data.columns))).toPandas()

In [193]:
song_data = spark.read.json('song_data/*/*/*/*/*.json')

In [194]:
song_data.cache()

DataFrame[artist_id: string, artist_latitude: double, artist_location: string, artist_longitude: double, artist_name: string, duration: double, num_songs: bigint, song_id: string, title: string, year: bigint]

In [195]:
song_data.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [196]:
log_data.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [197]:
log_data = log_data.filter(log_data.page == 'NextSong')

In [198]:
log_data.cache()

DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: double, sessionId: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string]

### Users

In [232]:
users = log_data.select(col('userId').alias('user_id'), col('firstName').alias('first_name'), col('lastName').alias('last_name'), col('gender'), col('level')).distinct().dropna('any')

In [233]:
users.limit(10).toPandas()

Unnamed: 0,user_id,first_name,last_name,gender,level
0,26,Ryan,Smith,M,free
1,7,Adelyn,Jordan,F,free
2,71,Ayleen,Wise,F,free
3,81,Sienna,Colon,F,free
4,87,Dustin,Lee,M,free
5,23,Morris,Gilmore,M,free
6,75,Joseph,Gutierrez,M,free
7,16,Rylan,George,M,paid
8,2,Jizelle,Benjamin,F,free
9,3,Isaac,Valdez,M,free


In [234]:
users.write.mode('overwrite').parquet(os.path.join(output_dir, 'users'))

### Times

In [202]:
dayofweek = udf(lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S').strftime('%w'))
timestamps = log_data.select('ts').distinct().withColumn('ts', from_unixtime(col('ts')/1000, 'yyyy-MM-dd HH:mm:ss'))

In [223]:
times = timestamps.select(col('ts').alias('start_time'), hour('ts').alias('hour'), dayofmonth('ts').alias('day'),
                              weekofyear('ts').alias('week'), month('ts').alias('month'), year('ts').alias('year'), 
                              dayofweek(col('ts')).alias('dow')).distinct().dropna('any')

In [224]:
times.limit(10).toPandas()

Unnamed: 0,start_time,hour,day,week,month,year,dow
0,2018-11-27 20:39:22,20,27,48,11,2018,2
1,2018-11-30 15:01:06,15,30,48,11,2018,5
2,2018-11-05 18:25:08,18,5,45,11,2018,1
3,2018-11-12 23:07:32,23,12,46,11,2018,1
4,2018-11-23 18:17:56,18,23,47,11,2018,5
5,2018-11-19 15:40:17,15,19,47,11,2018,1
6,2018-11-14 19:02:02,19,14,46,11,2018,3
7,2018-11-28 14:50:46,14,28,48,11,2018,3
8,2018-11-10 09:05:58,9,10,45,11,2018,6
9,2018-11-23 00:42:32,0,23,47,11,2018,5


In [225]:
times.write.mode('overwrite').partitionBy('year', 'month').parquet(os.path.join(output_dir, 'times'))

In [244]:
times.printSchema()

root
 |-- start_time: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- dow: string (nullable = true)



### Artists

In [205]:

def blank_to_null(x):
    return when(col(x) == regexp_extract(col(x), "(^\\s*$)", 1), None).otherwise(col(x))

In [206]:
artist_fields = ['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']
non_nulls = ['artist_id', 'artist_name']
artists = song_data.select(*artist_fields).distinct().withColumn('artist_location', blank_to_null('artist_location'))

In [208]:
artists.limit(10).toPandas()

Unnamed: 0,artist_id,artist_name,artist_location,artist_latitude,artist_longitude
0,AR3JMC51187B9AE49D,Backstreet Boys,"Orlando, FL",28.53823,-81.37739
1,AR0IAWL1187B9A96D0,Danilo Perez,Panama,8.4177,-80.11278
2,ARWB3G61187FB49404,Steve Morse,"Hamilton, Ohio",,
3,AR47JEX1187B995D81,SUE THOMPSON,"Nevada, MO",37.83721,-94.35868
4,ARHHO3O1187B989413,Bob Azzam,,,
5,ARAGB2O1187FB3A161,Pucho & His Latin Soul Brothers,,,
6,AREBBGV1187FB523D2,Mike Jones (Featuring CJ_ Mello & Lil' Bran),"Houston, TX",,
7,ARGSAFR1269FB35070,Blingtones,,,
8,AROUOZZ1187B9ABE51,Willie Bobo,"New York, NY [Spanish Harlem]",40.79195,-73.94512
9,AR0RCMP1187FB3F427,Billie Jo Spears,"Beaumont, TX",30.08615,-94.10158


In [209]:
def non_null_df(df, required_cols):
    return df.where(reduce(lambda x, y: x & y, (col(x).isNotNull() for x in required_cols)))

In [210]:
non_null_artists = non_null_df(artists, non_nulls)

In [211]:
non_null_artists.count()

69

In [212]:
non_null_artists.write.mode('overwrite').parquet(os.path.join(output_dir, 'artists'))

### Songs

In [237]:
song_fields = ['song_id', 'title', 'artist_id', 'year', 'duration']
non_nulls = ['song_id', 'title', 'artist_id']
songs = song_data.select(song_fields).fillna(0, subset=['year']).distinct()

In [238]:
songs.count()

71

In [215]:
non_null_songs = non_null_df(songs, non_nulls)

In [216]:
non_null_songs.count()

71

In [217]:
non_null_songs.limit(20).toPandas()

Unnamed: 0,song_id,title,artist_id,year,duration
0,SOGOSOV12AF72A285E,¿Dónde va Chichi?,ARGUVEV1187B98BA17,1997,313.12934
1,SOTTDKS12AB018D69B,It Wont Be Christmas,ARMBR4Y1187B9990EB,0,241.47546
2,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,ARMAC4T1187FB3FA4C,2004,207.77751
3,SOIAZJW12AB01853F1,Pink World,AR8ZCNI1187B9A069B,1984,269.81832
4,SONYPOM12A8C13B2D7,I Think My Wife Is Running Around On Me (Taco Hell),ARDNS031187B9924F0,2005,186.48771
5,SOYMRWW12A6D4FAB14,The Moon And I (Ordinary Day Album Version),ARKFYS91187B98E58F,0,267.7024
6,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert Hall],ARPBNLO1187FB3D52F,2000,43.36281
7,SOBCOSW12A8C13D398,Rumba De Barcelona,AR7SMBG1187B9B9066,0,218.38322
8,SOWTBJW12AC468AC6E,Broken-Down Merry-Go-Round,ARQGYP71187FB44566,0,151.84934
9,SOQHXMF12AB0182363,Young Boy Blues,ARGSJW91187B9B1D6B,0,218.77506


In [218]:
non_null_songs.write.mode('overwrite').partitionBy('year', 'artist_id').parquet(os.path.join(output_dir, 'songs'))

### Songsplays

In [219]:
joined_data = log_data.join(song_data, (log_data.song == song_data.title) & (log_data.artist == song_data.artist_name))

In [220]:
w = Window.orderBy(desc('start_time'))

songplays = joined_data.select(col('ts'), col('userId').alias('user_id'), col('level'), col('song_id'), col('artist_id'),\
                               col('sessionId').alias('session_id'), col('location'), col('userAgent').alias('user_agent'))\
            .withColumn('start_time', from_unixtime(col('ts')/1000, 'yyyy-MM-dd HH:mm:ss'))\
            .withColumn('year', year(col('start_time')))\
            .withColumn('month', month(col('start_time')))\
            .drop('ts')\
            .distinct()\
            .withColumn('sonplay_id', row_number().over(w)).repartition('year', 'month')

In [221]:
songplays.toPandas()

Unnamed: 0,user_id,level,song_id,artist_id,session_id,location,user_agent,start_time,year,month,sonplay_id
0,15,paid,SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4,818,"Chicago-Naperville-Elgin, IL-IN-WI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36""",2018-11-21 21:56:47,2018,11,1


In [222]:
songplays.write.mode('overwrite').partitionBy('year', 'month').parquet(os.path.join(output_dir, 'songplays'))

In [247]:
import shutil
shutil.rmtree(output_dir)