# Sparkify DataLakes AWS and Spark Project
This notebook is meant to testing purposes
**by jerryespn**
-April 2020-

### Library import

In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

### Configuration File for AWS

In [2]:
config = configparser.ConfigParser()
config.read('conf/dl.cfg')

['conf/dl.cfg']

### AWS access keys taken from configuration file

In [3]:
os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

### SparkContext Initialize

In [4]:
def create_spark_session():
    '''
    Initialize the SparkSession for the application, includes AWS package to use it
    '''
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark
spark = create_spark_session()

### Work Paths Initialization

In [5]:
# Song Data files
# Uncomment below line for AWS S3
#song_data = "s3a://udacity-dend/song_data/*/*/*"
# Uncomment below line for local files
song_data = "data/song_data/*/*/*"

# Log Data Files
# Uncomment below line for AWS S3
#log_data = "s3a://udacity-dend/log_data/*"
# Uncomment below line for local files
log_data = "data/log_data/*"

# Warehouse
# Root WH
# Uncomment below line for AWS S3
#output_data = "s3a://jerryespn-project-out"
# Uncomment below line for local files
output_data = "spark-warehouse"

# Song WH
# Uncomment below line for AWS S3
#soutput_data = "s3a://jerryespn-project-out/song"
# Uncomment below line for local files
soutput_data = "spark-warehouse/song"

# Log WH
# Uncomment below line for AWS S3
#loutput_data = "s3a://jerryespn-project-out/log"
# Uncomment below line for local files
loutput_data = "spark-warehouse/log"


### Process Song

###### TESTING THE QUERIES #####

In [6]:
df = spark.read.json(song_data)

In [7]:
df.take(10)

[Row(artist_id='ARDR4AC1187FB371A1', artist_latitude=None, artist_location='', artist_longitude=None, artist_name='Montserrat Caballé;Placido Domingo;Vicente Sardinero;Judith Blegen;Sherrill Milnes;Georg Solti', duration=511.16363, num_songs=1, song_id='SOBAYLL12A8C138AF9', title='Sono andati? Fingevo di dormire', year=0),
 Row(artist_id='AREBBGV1187FB523D2', artist_latitude=None, artist_location='Houston, TX', artist_longitude=None, artist_name="Mike Jones (Featuring CJ_ Mello & Lil' Bran)", duration=173.66159, num_songs=1, song_id='SOOLYAZ12A6701F4A6', title='Laws Patrolling (Album Version)', year=0),
 Row(artist_id='ARMAC4T1187FB3FA4C', artist_latitude=40.82624, artist_location='Morris Plains, NJ', artist_longitude=-74.47995, artist_name='The Dillinger Escape Plan', duration=207.77751, num_songs=1, song_id='SOBBUGU12A8C13E95D', title='Setting Fire to Sleeping Giants', year=2004),
 Row(artist_id='ARPBNLO1187FB3D52F', artist_latitude=40.71455, artist_location='New York, NY', artist_lo

In [8]:
df.createOrReplaceTempView("songs")

In [9]:
song_table = spark.sql ("SELECT DISTINCT song_id, title as song_title, artist_id, year, duration FROM songs")

In [10]:
song_table.take(10)

[Row(song_id='SOGNCJP12A58A80271', song_title='Do You Finally Need A Friend', artist_id='ARB29H41187B98F0EF', year=1972, duration=342.56934),
 Row(song_id='SOOJPRH12A8C141995', song_title='Loaded Like A Gun', artist_id='ARBGXIG122988F409D', year=0, duration=173.19138),
 Row(song_id='SOFCHDR12AB01866EF', song_title='Living Hell', artist_id='AREVWGE1187B9B890A', year=0, duration=282.43546),
 Row(song_id='SOWTBJW12AC468AC6E', song_title='Broken-Down Merry-Go-Round', artist_id='ARQGYP71187FB44566', year=0, duration=151.84934),
 Row(song_id='SOGOSOV12AF72A285E', song_title='¿Dónde va Chichi?', artist_id='ARGUVEV1187B98BA17', year=1997, duration=313.12934),
 Row(song_id='SOTUKVB12AB0181477', song_title='Blessed Assurance', artist_id='AR7ZKHQ1187B98DD73', year=1993, duration=270.602),
 Row(song_id='SOMVWWT12A58A7AE05', song_title='Knocked Out Of The Park', artist_id='ARQ9BO41187FB5CF1F', year=0, duration=183.17016),
 Row(song_id='SOBEBDG12A58A76D60', song_title='Kassie Jones', artist_id='ARI3

In [11]:
song_table.write.partitionBy("year", "artist_id").parquet(path = soutput_data + "/song/songs.parquet", mode = "overwrite")

In [12]:
artist_table = spark.sql ("SELECT DISTINCT artist_id, artist_name, artist_location, artist_latitude, artist_longitude FROM songs")

In [13]:
artist_table.take(10)

[Row(artist_id='ARPBNLO1187FB3D52F', artist_name='Tiny Tim', artist_location='New York, NY', artist_latitude=40.71455, artist_longitude=-74.00712),
 Row(artist_id='ARBEBBY1187B9B43DB', artist_name='Tom Petty', artist_location='Gainesville, FL', artist_latitude=None, artist_longitude=None),
 Row(artist_id='AR0IAWL1187B9A96D0', artist_name='Danilo Perez', artist_location='Panama', artist_latitude=8.4177, artist_longitude=-80.11278),
 Row(artist_id='ARMBR4Y1187B9990EB', artist_name='David Martin', artist_location='California - SF', artist_latitude=37.77916, artist_longitude=-122.42005),
 Row(artist_id='ARD0S291187B9B7BF5', artist_name='Rated R', artist_location='Ohio', artist_latitude=None, artist_longitude=None),
 Row(artist_id='AR0RCMP1187FB3F427', artist_name='Billie Jo Spears', artist_location='Beaumont, TX', artist_latitude=30.08615, artist_longitude=-94.10158),
 Row(artist_id='ARKRRTF1187B9984DA', artist_name='Sonora Santanera', artist_location='', artist_latitude=None, artist_longi

In [14]:
artist_table.write.parquet(path = output_data + "/artist/artists.parquet", mode = "overwrite")

### Process Log

In [15]:
df2 = spark.read.json(log_data)

In [16]:
df2.take(10)

[Row(_corrupt_record='PK\x03\x04\x14\x00\x00\x00\x08\x00�j�N��t�\x04\x00\x00�\x1b\x00\x00\x16\x00\x00\x002018-11-01-events.jsonՙ�n�8\x10���)\x08�l\x0b؊(�^9��m\x1c��\x1bw\x17\x01���\u0604)2��$n�w���t�mWr\x02\x15u\x01\x03�(', artist=None, auth=None, firstName=None, gender=None, itemInSession=None, lastName=None, length=None, level=None, location=None, method=None, page=None, registration=None, sessionId=None, song=None, status=None, ts=None, userAgent=None, userId=None),
 Row(_corrupt_record='5�7?5\x1c}v�XQXgW�R�\x1c(���uN�lƧ����s)LaO!��>\x06i���\x19WS<�u�x",�{jȋBh��z-G�7�C��x��j����1�_s�\x17/', artist=None, auth=None, firstName=None, gender=None, itemInSession=None, lastName=None, length=None, level=None, location=None, method=None, page=None, registration=None, sessionId=None, song=None, status=None, ts=None, userAgent=None, userId=None),
 Row(_corrupt_record='__�\x19ص�3\x04E\x0e', artist=None, auth=None, firstName=None, gender=None, itemInSession=None, lastName=None, length=None, leve

In [17]:
df2.createOrReplaceTempView("staging_events")

In [18]:
# Filtering by NextSong
filtered_songplays_query = spark.sql ("SELECT *, cast(ts/1000 as Timestamp) AS timestamp FROM staging_events WHERE page = 'NextSong'")

In [19]:
filtered_songplays_query.take(10)

[Row(_corrupt_record=None, artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26', timestamp=datetime.datetime(2018, 11, 15, 0, 30, 26, 796000)),
 Row(_corrupt_record=None, artist='The Prodigy', auth='Logged In', firstName='Ryan', gender='M', itemInSession=1, lastName='Smith', length=260.07465, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='The Big Gundown', status=200, ts=1542242481796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromi

In [20]:
filtered_songplays_query.createOrReplaceTempView("staging_events")

##### Users Table extraction and loading into parquet files

In [21]:
users_table = spark.sql ("""SELECT a.userId, a.level, a.firstName, a.lastName, a.gender FROM staging_events a INNER JOIN (SELECT userId, MAX(ts) AS TS FROM staging_events GROUP BY userId, page) b ON a.userId = b.userId AND a.ts = b.ts""")

In [22]:
users_table.take(10)

[Row(userId='65', level='paid', firstName='Amiya', lastName='Davidson', gender='F'),
 Row(userId='59', level='free', firstName='Lily', lastName='Cooper', gender='F'),
 Row(userId='99', level='free', firstName='Ann', lastName='Banks', gender='F'),
 Row(userId='34', level='free', firstName='Evelin', lastName='Ayala', gender='F'),
 Row(userId='62', level='free', firstName='Connar', lastName='Moreno', gender='M'),
 Row(userId='40', level='free', firstName='Tucker', lastName='Garrison', gender='M'),
 Row(userId='76', level='free', firstName='Jayden', lastName='Duffy', gender='F'),
 Row(userId='100', level='free', firstName='Adler', lastName='Barrera', gender='M'),
 Row(userId='19', level='free', firstName='Zachary', lastName='Thomas', gender='M'),
 Row(userId='69', level='free', firstName='Anabelle', lastName='Simpson', gender='F')]

In [23]:
# Extracting unique users
clean_users_table = users_table.dropDuplicates(['userId','level'])

In [24]:
clean_users_table.count()

96

In [25]:
clean_users_table.take(10)

[Row(userId='18', level='free', firstName='Jacob', lastName='Rogers', gender='M'),
 Row(userId='6', level='free', firstName='Cecilia', lastName='Owens', gender='F'),
 Row(userId='79', level='free', firstName='James', lastName='Martin', gender='M'),
 Row(userId='16', level='paid', firstName='Rylan', lastName='George', gender='M'),
 Row(userId='39', level='free', firstName='Walter', lastName='Frye', gender='M'),
 Row(userId='61', level='free', firstName='Samuel', lastName='Gonzalez', gender='M'),
 Row(userId='87', level='free', firstName='Dustin', lastName='Lee', gender='M'),
 Row(userId='22', level='free', firstName='Sean', lastName='Wilson', gender='F'),
 Row(userId='29', level='paid', firstName='Jacqueline', lastName='Lynch', gender='F'),
 Row(userId='5', level='free', firstName='Elijah', lastName='Davis', gender='M')]

In [26]:
clean_users_table.write.parquet(path = loutput_data + "/users/users.parquet", mode = "overwrite")

##### Write time table to parquet files partitioned by year and month

In [27]:
time_table = spark.sql("""SELECT DISTINCT timestamp AS start_time, hour(timestamp) AS hour, day(timestamp) AS day, weekofyear(timestamp) AS week, month(timestamp) AS month, year (timestamp) AS year, weekday(timestamp) AS weekday FROM staging_events""")

In [28]:
time_table.write.partitionBy("year", "month").parquet(path = loutput_data + "/time/time.parquet", mode = "overwrite")

##### Read in song data to use for songplays table

In [29]:
s_df = spark.read.parquet(soutput_data + "/song/*")
s_df.createOrReplaceTempView("songs")

##### Extract columns from joined song and log datasets to create songplays table 

In [30]:
songplays_table = spark.sql("""SELECT a.timestamp AS start_time, a.userId, a.level, b.song_id, b.artist_id, a.sessionId, a.location, a.userAgent, year(a.timestamp) AS year, month(a.timestamp) AS month FROM staging_events AS a INNER JOIN songs AS b ON a.song = b.song_title""")
#songplays_table = spark.sql("SELECT * FROM songs")

In [31]:
songplays_table.take(10)

[Row(start_time=datetime.datetime(2018, 11, 21, 21, 56, 47, 796000), userId='15', level='paid', song_id='SOZCTXZ12AB0182364', artist_id='AR5KOSW1187FB35FF4', sessionId=818, location='Chicago-Naperville-Elgin, IL-IN-WI', userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', year=2018, month=11),
 Row(start_time=datetime.datetime(2018, 11, 14, 5, 6, 3, 796000), userId='10', level='free', song_id='SOGDBUF12A8C140FAA', artist_id='AR558FS1187FB45658', sessionId=484, location='Washington-Arlington-Alexandria, DC-VA-MD-WV', userAgent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.77.4 (KHTML, like Gecko) Version/7.0.5 Safari/537.77.4"', year=2018, month=11),
 Row(start_time=datetime.datetime(2018, 11, 19, 9, 14, 20, 796000), userId='24', level='paid', song_id='SOGDBUF12A8C140FAA', artist_id='AR558FS1187FB45658', sessionId=672, location='Lake Havasu City-Kingman, AZ', userAgent='"

In [32]:
songplays_table.write.partitionBy("year", "month").parquet(path = output_data + "/songplays/songplays.parquet", mode = "overwrite")