#### **The below ipnynb script would manipulate small zipped files to produce prototype tables that can be queried to  address business questions before applying the script to a big gata.

## Task1: Import Required packages

In [4]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.types import *
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

## Task2: Create local spark session

In [5]:
spark = SparkSession.builder\
                    .master("local")\
                    .appName("SparkifyMusicApp")\
                    .getOrCreate()

#####  Verify task2- Spark session check


In [6]:
spark 
spark.sparkContext.getConf().getAll()   
""" Info on sparksession in memory, context, cinfig detail"""

In [7]:
spark.sparkContext.getConf().getAll()

[('spark.master', 'local'),
 ('spark.driver.port', '39261'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.app.name', 'SparkifyMusicApp'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.host', '87121cff9624'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.app.id', 'local-1561865483031')]

## Task 3: One time task of unzipping song and log files using IMPORT Zipfile python facility

In [6]:
#import zipfile
#zfsong = zipfile.ZipFile("./data/song-data.zip",'r')
#zfsong.extractall("./data/song-data/")
#zflog = zipfile.ZipFile("./data/log-data.zip",'r')
#zflog.extractall("./data/log-data/")
#zfsong.close()
#zflog.close()

## Task 4: Define input-output paths

In [8]:
output_path = "./output_data/"
song_data = "./data/song-data/song_data/*/*/*/*.json"
log_data = "./data/log-data/*.json"

## Task 5: Extract columns, fill table , create temp view and perform parquet for songs and artists tables

### 5.1. Song data processing (create-store)

In [9]:
songs_df = spark.read.json(song_data)
songs_df.printSchema()
""" song data rrad and schema info displayed"""

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



### 5.2. Cleansing and applying constraints to song data

In [10]:
songs_schema = StructType([
    StructField("artist_id", StringType(), False),
    StructField("artist_latitude", DoubleType(), True),
    StructField("artist_location", StringType(), True),
    StructField("artist_longitude", DoubleType(), True),
    StructField("artist_name", StringType(), True),
    StructField("duration", DoubleType(), True),
    StructField("num_songs", LongType(), True),
    StructField("song_id", StringType(), False),
    StructField("title", StringType(), True),
    StructField("year", IntegerType(), True)
])

###  5.3. Reloading song dataframe following cleansing and applying constraints

In [11]:
songs_df = spark.read.json(song_data,
                    schema=songs_schema,
                    mode="DROPMALFORMED")
"""  Song data dataframe reloaded ignoring malformed records. #DROPMALFORMED : ignores the whole corrupted records."""

### 5.4. Print schema of the song spark dataframe

In [13]:
songs_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)



### 5.5. define columns, fill table , create temp view and perform parquet for songs and artists tables

In [14]:
songscols= ["song_id", "title", "artist_id", "year", "duration"] 
songs_table = songs_df[songscols]
songs_table.createOrReplaceTempView("songs")
songs_table = songs_table.write.mode("overwrite").partitionBy("year","artist_id").parquet(output_path + "songs")

In [15]:
artistcols= ["artist_id","artist_name","artist_location","artist_latitude","artist_longitude"]
artists_table = songs_df[artistcols].dropDuplicates()
artists_table.createOrReplaceTempView('artists')
artists_table = artists_table.write.mode("overwrite").parquet(output_path + "artists")

### 5.6. View/extract songs and artists tables

In [16]:
songs_table = spark.sql("""
            SELECT song_id, 
            title,
            artist_id,
            year,
            duration
            FROM songs
            """)
songs_table.limit(5).toPandas()
""" Displays the first 5 records of songs table"""

Unnamed: 0,song_id,title,artist_id,year,duration
0,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,ARDR4AC1187FB371A1,0,511.16363
1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),AREBBGV1187FB523D2,0,173.66159
2,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,ARMAC4T1187FB3FA4C,2004,207.77751
3,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,ARPBNLO1187FB3D52F,2000,43.36281
4,SONYPOM12A8C13B2D7,I Think My Wife Is Running Around On Me (Taco ...,ARDNS031187B9924F0,2005,186.48771


In [17]:
# Get artist table from songs temp view
artists_table = spark.sql("""
                SELECT DISTINCT artist_id,
                artist_name name,
                artist_location location,
                artist_latitude latitude,
                artist_longitude longitude
                FROM artists
                """)
artists_table.limit(5).toPandas()
""" Displays the first 5 records of artists table"""

Unnamed: 0,artist_id,name,location,latitude,longitude
0,ARPBNLO1187FB3D52F,Tiny Tim,"New York, NY",40.71455,-74.00712
1,ARBEBBY1187B9B43DB,Tom Petty,"Gainesville, FL",,
2,AR0IAWL1187B9A96D0,Danilo Perez,Panama,8.4177,-80.11278
3,ARD0S291187B9B7BF5,Rated R,Ohio,,
4,ARMBR4Y1187B9990EB,David Martin,California - SF,37.77916,-122.42005


## Task6: Log data reading and parsing to spark dataframe 

In [18]:
logs_df = spark.read.json(log_data)
logs_df.printSchema()
"""log data read; schema displayed"""

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



## Task7: Filter page of NextSong only

In [19]:
# Filter page of NextSong only
logs_df = logs_df.filter(logs_df["page"] == "NextSong")
logs_df.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80


## Task 8: Create temp view of log data for use in directly querying 

In [20]:
# Create temp view used for quering
logs_df.createOrReplaceTempView("logs")

### 8.1. Create and store users table

In [21]:
# Get user table from logs
users_table = spark.sql("""
                    SELECT DISTINCT userId user_id,
                    firstName first_name,
                    lastName last_name,
                    gender,
                    level
                    FROM logs
                    WHERE TRIM(userId) <> ''
                """)
users_table.limit(5).toPandas()

Unnamed: 0,user_id,first_name,last_name,gender,level
0,98,Jordyn,Powell,F,free
1,34,Evelin,Ayala,F,free
2,85,Kinsley,Young,F,paid
3,38,Gianna,Jones,F,free
4,85,Kinsley,Young,F,free


In [22]:
# Store user table
users_table.write.mode("overwrite").parquet(output_path + "users")

### 8.2. Create and store time table
   Apply udf to get timestamp

In [23]:
# Add in timestamp column for current logs table
get_timestamp = udf(lambda x: datetime.fromtimestamp(x/1000), TimestampType())
logs_df = logs_df.withColumn("timestamp", get_timestamp(logs_df.ts))
logs_df.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,timestamp
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:30:26.796
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:41:21.796
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:45:41.796
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61,2018-11-15 03:44:09.796
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 05:48:55.796


In [24]:
# Check the schema
logs_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [25]:
# Create temp view used for quering
logs_df.createOrReplaceTempView("time")

In [26]:
# Get time table from user's event logs
time_table = spark.sql("""
    SELECT DISTINCT timestamp start_time,
        HOUR(timestamp) hour,
        DAYOFMONTH(timestamp) day,
        WEEKOFYEAR(timestamp) week,
        MONTH(timestamp) month,
        YEAR(timestamp) year,
        DAYOFWEEK(timestamp) weekday
    FROM time
""")
time_table.limit(5).toPandas()

Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,2018-11-15 21:04:27.796,21,15,46,11,2018,5
1,2018-11-21 00:57:58.796,0,21,47,11,2018,4
2,2018-11-14 00:17:37.796,0,14,46,11,2018,4
3,2018-11-14 07:08:14.796,7,14,46,11,2018,4
4,2018-11-14 09:02:39.796,9,14,46,11,2018,4


In [27]:
# Write time table to disk with partitioning by year and month
time_table.write.mode("overwrite").partitionBy("year", "month").parquet(output_path + "time")

## Task 9: Create and store songplays_table

### 9.1. Reload required songs and artists data and recrate views for use to create songplays table

In [28]:
# Create temp view used for SQL query
logs_df.createOrReplaceTempView("logs")

# Load songs table
songs_df = spark.read.parquet(output_path + "songs")
songs_df.createOrReplaceTempView("songs")

# Load artists table
artists_df = spark.read.parquet(output_path + "artists")
artists_df.createOrReplaceTempView("artists")

In [29]:
# Create temp view used for SQL query
logs_df.createOrReplaceTempView("logs")

In [30]:
# Load songs table
songs_table.createOrReplaceTempView("songs")

In [31]:
# Load artists table
artists_table.createOrReplaceTempView('artists')

In [32]:
# Create songplays table
#monotonically_increasing_id() assigns row number to pyspark dataframe
songplays_table = spark.sql("""
SELECT monotonically_increasing_id() songplay_id,
lo.timestamp start_time,
lo.userId user_id,
so.song_id,
ar.artist_id,
lo.sessionId session_id,
lo.location,
lo.userAgent user_agent,
MONTH(lo.timestamp) month,
YEAR(lo.timestamp) year
FROM logs lo
INNER JOIN songs so ON so.title = lo.song AND so.duration = lo.length
INNER JOIN artists ar ON ar.artist_id = so.artist_id AND ar.name = lo.artist
""")
songplays_table.limit(10).toPandas()
"""Produces records for songplays_table"""

Unnamed: 0,songplay_id,start_time,user_id,song_id,artist_id,session_id,location,user_agent,month,year
0,0,2018-11-21 21:56:47.796,15,SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4,818,"Chicago-Naperville-Elgin, IL-IN-WI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",11,2018


In [33]:
# Write songplays table to disk
songplays_table.write.mode("overwrite").partitionBy("year", "month").parquet(output_path + "songplays")

#### Remove folder with content (one time) 

In [None]:
#import shutil

#shutil.rmtree('./folder')