https://knowledge.udacity.com/questions/481917 <br>
https://knowledge.udacity.com/questions/340141

https://knowledge.udacity.com/questions/460708 <br>
https://discussions.apple.com/thread/3242258 

https://knowledge.udacity.com/questions/825541 <br>
https://knowledge.udacity.com/questions/713520

# Sparkify DataLake Project
BY: MAHMOUD NAGY - OCT 2022

- In this Notebook, we are going to work locally on our project with a smaller dataset (found in the workspace), 
- and then we can move on to the bigger dataset on AWS.

In [1]:
# import configparser
from time import time
from datetime import datetime
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col, desc, substring
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import TimestampType

## Make sure that your AWS credentials are loaded as env vars
We are working locally in this notebook, so we will not use these credentials until we move to the cloud!

In [2]:
# config = configparser.ConfigParser()
# config.read('dl.cfg')

# os.environ['AWS_ACCESS_KEY_ID']=config['AWS_ACCESS_KEY_ID']
# os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS_SECRET_ACCESS_KEY']

## Create Spark Session with the hadoop-aws package
hadoop-aws is a library which is used for us to connect with Amazon S3 <br>
We are working locally in this notebook, so we don't need to connect with S3 until we move to the cloud!

In [3]:
# def create_spark_session():
#     spark = SparkSession \
#         .builder \
#         .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
#         .getOrCreate()
#     return spark

In [4]:
spark = SparkSession \
        .builder \
        .appName("TestBeforeCLoud") \
        .getOrCreate()
spark

22/10/12 04:04:48 WARN Utils: Your hostname, Mahmouds-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.2 instead (on interface en0)
22/10/12 04:04:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/10/12 04:04:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


- SparkContext configuration cannot be modified on runtime. You have to stop existing context first.
- SQLContext configuration can be modified on runtime.

https://stackoverflow.com/questions/32233575/read-all-files-in-a-nested-folder-in-spark

# Song Data

In [5]:
# get filepath to song data file
song_data = "data/song_data"
    
# read song data file
df_songs = spark.read\
          .option("recursiveFileLookup","true")\
          .json(song_data)

print(df_songs.count())
df_songs.printSchema()
df_songs.limit(5).toPandas()

                                                                                

71
root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARDR4AC1187FB371A1,,,,Montserrat Caballé;Placido Domingo;Vicente Sar...,511.16363,1,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,0
1,AREBBGV1187FB523D2,,"Houston, TX",,Mike Jones (Featuring CJ_ Mello & Lil' Bran),173.66159,1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),0
2,ARMAC4T1187FB3FA4C,40.82624,"Morris Plains, NJ",-74.47995,The Dillinger Escape Plan,207.77751,1,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,2004
3,ARPBNLO1187FB3D52F,40.71455,"New York, NY",-74.00712,Tiny Tim,43.36281,1,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,2000
4,ARDNS031187B9924F0,32.67828,Georgia,-83.22295,Tim Wilson,186.48771,1,SONYPOM12A8C13B2D7,I Think My Wife Is Running Around On Me (Taco ...,2005


<br>

## Songs Table

## Extract Columns Using Spark Dataframes

song_id, title, artist_id, year, duration

In [6]:
t0 = time()
# extract columns to create songs table
songs_table = df_songs.select(["song_id", "title", "artist_id", "year", "duration"]).dropDuplicates()

print(time()-t0)
print(songs_table.count())
songs_table.printSchema()
songs_table.limit(5).toPandas()

0.0606839656829834
71
root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)



Unnamed: 0,song_id,title,artist_id,year,duration
0,SOTTDKS12AB018D69B,It Wont Be Christmas,ARMBR4Y1187B9990EB,0,241.47546
1,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,ARMAC4T1187FB3FA4C,2004,207.77751
2,SONYPOM12A8C13B2D7,I Think My Wife Is Running Around On Me (Taco ...,ARDNS031187B9924F0,2005,186.48771
3,SOYMRWW12A6D4FAB14,The Moon And I (Ordinary Day Album Version),ARKFYS91187B98E58F,0,267.7024
4,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,ARPBNLO1187FB3D52F,2000,43.36281


## Extract Columns Using Spark SQL

In [7]:
df_songs.createOrReplaceTempView("songs")
t0 = time()
# extract columns to create songs table
songs_table = spark.sql("""
    SELECT song_id, title, artist_id, year, duration
    FROM songs
""")

print(time()-t0)
print(songs_table.count())
songs_table.printSchema()
songs_table.limit(5).toPandas()

0.07294607162475586
71
root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)



Unnamed: 0,song_id,title,artist_id,year,duration
0,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,ARDR4AC1187FB371A1,0,511.16363
1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),AREBBGV1187FB523D2,0,173.66159
2,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,ARMAC4T1187FB3FA4C,2004,207.77751
3,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,ARPBNLO1187FB3D52F,2000,43.36281
4,SONYPOM12A8C13B2D7,I Think My Wife Is Running Around On Me (Taco ...,ARDNS031187B9924F0,2005,186.48771


<br>

#### Apache Parquet 
- is an open source, column-oriented data file format designed for efficient data storage and retrieval.

In [8]:
# write songs table to parquet files partitioned by year and artist
songs_table.write\
.partitionBy("year", "artist_id")\
.mode('overwrite')\
.parquet('./songs')

                                                                                

In [9]:
# # write songs table to parquet files partitioned by year and artist
# song_data.write\
# .partitionBy("year", "artist_id")\
# .mode('overwrite')\
# .parquet(os.path.join('s3a://udacity-dend-output/', 'songs'))

<br>

https://towardsdatascience.com/data-skew-in-pyspark-783d529a9dd7

#### 1. Repartition by Column(s)
- The first solution is to logically re-partition your data based on the transformations in your script. 
- In short, if you’re grouping or joining, partitioning by the groupBy/join columns can improve shuffle efficiency.
- `df = df.repartition(<n_partitions>, '<col_1>', '<col_2>',...)`

- `repartition()` is used to partition data in memory 
- and `partitionBy` is used to partition data on disk. 
- They're often used in conjunction.
<br>
- `repartition()` is used for shuffling data, that's why it occurs in memory.
- Both `repartition()` and `partitionBy` can be used to "partition data based on dataframe column", 
- but `repartition()` partitions the data in memory and `partitionBy` partitions the data on disk.

## 2 Different Ways to solve the skewed data problem
- Assign a new, temporary partition key before processing any huge shuffles
- The 2nd method is using `repartition()`

The files are partitioned by the first three letters of each song's track ID. <br>
**For example, here are filepaths to two files in this dataset.** <br>
`song_data/A/B/C/TRABCEI128F424C983.json` <br>
`song_data/A/A/B/TRAABJL12903CDCF1A.json`

In [10]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [11]:
# 1 master, 4 cores --> since my local machine is core i5 --> meaning that it has 5 cores for processing data
sc.defaultParallelism 

4

In [12]:
df_songs.groupBy('year').count().sort('year').show()

+----+-----+
|year|count|
+----+-----+
|   0|   43|
|1961|    1|
|1964|    1|
|1969|    1|
|1972|    1|
|1982|    1|
|1984|    1|
|1985|    1|
|1986|    1|
|1987|    1|
|1992|    1|
|1993|    1|
|1994|    2|
|1997|    2|
|1999|    1|
|2000|    2|
|2003|    2|
|2004|    4|
|2005|    2|
|2007|    1|
+----+-----+
only showing top 20 rows



In [13]:
df_songs.groupBy('artist_id').count().sort(desc('count')).show()

+------------------+-----+
|         artist_id|count|
+------------------+-----+
|ARNTLGG11E2835DDB9|    2|
|ARD7TVE1187B99BFB1|    2|
|AR9AWNF1187B9AB0B4|    1|
|AR0RCMP1187FB3F427|    1|
|AR10USD1187B99F3F1|    1|
|ARB29H41187B98F0EF|    1|
|ARMAC4T1187FB3FA4C|    1|
|AREBBGV1187FB523D2|    1|
|ARKFYS91187B98E58F|    1|
|ARLTWXK1187FB5A3F8|    1|
|ARPFHN61187FB575F6|    1|
|ARDR4AC1187FB371A1|    1|
|AR47JEX1187B995D81|    1|
|ARPBNLO1187FB3D52F|    1|
|ARQGYP71187FB44566|    1|
|ARMBR4Y1187B9990EB|    1|
|AROUOZZ1187B9ABE51|    1|
|ARNF6401187FB57032|    1|
|ARI2JSK1187FB496EF|    1|
|ARDNS031187B9924F0|    1|
+------------------+-----+
only showing top 20 rows



In [14]:
df_songs.groupBy(['year', 'artist_id']).count().sort(desc('count')).show()

+----+------------------+-----+
|year|         artist_id|count|
+----+------------------+-----+
|   0|ARD7TVE1187B99BFB1|    2|
|   0|ARNTLGG11E2835DDB9|    2|
|   0|ARPFHN61187FB575F6|    1|
|   0|ARI2JSK1187FB496EF|    1|
|2000|ARPBNLO1187FB3D52F|    1|
|1992|AR0RCMP1187FB3F427|    1|
|   0|ARMBR4Y1187B9990EB|    1|
|1985|AR47JEX1187B995D81|    1|
|   0|AR9AWNF1187B9AB0B4|    1|
|1972|ARB29H41187B98F0EF|    1|
|1994|ARNF6401187FB57032|    1|
|   0|AR10USD1187B99F3F1|    1|
|   0|ARKFYS91187B98E58F|    1|
|2005|ARDNS031187B9924F0|    1|
|   0|ARDR4AC1187FB371A1|    1|
|   0|ARLTWXK1187FB5A3F8|    1|
|   0|AREBBGV1187FB523D2|    1|
|2004|ARMAC4T1187FB3FA4C|    1|
|   0|ARQGYP71187FB44566|    1|
|1997|AROUOZZ1187B9ABE51|    1|
+----+------------------+-----+
only showing top 20 rows



In [15]:
df_songs = df_songs.repartition('year', 'artist_id')

<br>

<br>

## Artists Table

artist_id, name, location, <font color='red'>lattitude</font>, longitude

https://stackoverflow.com/questions/34077353/how-to-change-dataframe-column-names-in-pyspark <br>
https://knowledge.udacity.com/questions/678149

In [16]:
# extract columns to create artists table
t0 = time()

cols = ['artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']
cols = [col + ' as ' + col[7:] for col in cols]
artists_table = df_songs.selectExpr('artist_id', *cols)

print(time()-t0)
print(artists_table.count())
artists_table.printSchema()
artists_table.limit(5).toPandas()

0.03940892219543457
71
root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



Unnamed: 0,artist_id,name,location,latitude,longitude
0,ARPFHN61187FB575F6,Lupe Fiasco,"Chicago, IL",41.88415,-87.63241
1,ARI2JSK1187FB496EF,Nick Ingman;Gavyn Wright,"London, England",51.50632,-0.12714
2,ARMBR4Y1187B9990EB,David Martin,California - SF,37.77916,-122.42005
3,ARPBNLO1187FB3D52F,Tiny Tim,"New York, NY",40.71455,-74.00712
4,AR9AWNF1187B9AB0B4,Kenny G featuring Daryl Hall,"Seattle, Washington USA",,


In [17]:
# # extract columns to create artists table
# t0 = time()

# columns = ['artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']
# columns = [col + ' as ' + col.replace('artist_', '') for col in columns]
# artists_table = df.selectExpr('artist_id', *columns)

# print(time()-t0)
# print(artists_table.count())
# artists_table.printSchema()
# artists_table.limit(5).toPandas()

In [18]:
# # extract columns to create artists table
# t0 = time()

# artists_table = df.selectExpr(["artist_id",
#                            "artist_name as name", 
#                            "artist_location as location", 
#                            "artist_latitude as latitude",
#                            "artist_longitude as longitude"])\
#                   .dropDuplicates()

# print(time()-t0)
# print(artists_table.count())
# artists_table.printSchema()
# artists_table.limit(5).toPandas()

In [19]:
# artists_table.withColumn("prefix", substring('artist_name', 0, 2))\
#              .write.mode("overwrite")\
#              .partitionBy("prefix")\
#              .parquet("./artists")

In [20]:
# write artists table to parquet files
artists_table.write\
.parquet('./artists', mode='overwrite')

<br>

# Log Data

In [21]:
# get filepath to log data file
log_data = "data/log_data/*.json"

# read log data file
df_logs = spark.read.json(log_data)

print(df_logs.count())
df_logs.printSchema()
df_logs.limit(5).toPandas()

8056
root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,12


In [22]:
# filter by actions for song plays
df_logs = df_logs.filter(" page = 'NextSong'")
# OR
# df_logs = df_logs.filter(df_logs['page'] == "NextSong")
# OR
# df_logs = df_logs.where(df_logs['page'] == "NextSong")



print(df_logs.count())
df_logs.printSchema()
df_logs.limit(5).toPandas()

6820
root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80


<br>

## Users Table

user_id, first_name, last_name, gender, level

In [23]:
# # extract columns for users table    
# users_table = df_logs.select('userId', 'firstName', 'lastName', 'gender', 'level').dropDuplicates()\
# .withColumnRenamed('userId', 'user_id')\
# .withColumnRenamed('lastName', 'last_name')\
# .withColumnRenamed('firstName', 'first_name')

# print(users_table.count())
# users_table.printSchema()
# users_table.limit(5).toPandas()

In [24]:
# extract columns for users table    
users_table = df_logs.selectExpr("userId as user_id", 
                                 "firstName as first_name",
                                 "lastName as last_name",
                                 "gender", 
                                 "level").dropDuplicates()

print(users_table.count())
users_table.printSchema()
users_table.limit(5).toPandas()

104
root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



Unnamed: 0,user_id,first_name,last_name,gender,level
0,26,Ryan,Smith,M,free
1,81,Sienna,Colon,F,free
2,75,Joseph,Gutierrez,M,free
3,16,Rylan,George,M,paid
4,54,Kaleb,Cook,M,free


In [25]:
# extract columns for users table    
# users_table = df.selectExpr(['user_id', 'firstName as first_name', 'lastName as last_name', 'gender', 'level', 'ts'])
# users_window = Window.partitionBy('user_id').orderBy(F.desc('ts'))
# users_table = users_table.withColumn('row_number', F.row_number().over(users_window))
# users_table = users_table.where(users_table.row_number == 1).drop('ts', 'row_number')

In [26]:
# write users table to parquet files
users_table.write.parquet("./users", mode="overwrite")

<br>

## Time Table
start_time, hour, day, week, month, year, weekday

In [27]:
# create timestamp column from original timestamp column
get_timestamp = udf(lambda ts: datetime.fromtimestamp(ts/1000), TimestampType())
df_logs = df_logs.withColumn('start_time', get_timestamp('ts'))
    
# # create datetime column from original timestamp column
# get_datetime = udf()
# df_logs = 

print(df_logs.count())
df_logs.printSchema()
df_logs.limit(5).toPandas()

6820
root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: timestamp (nullable = true)



                                                                                

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,start_time
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 02:30:26.796
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 02:41:21.796
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 02:45:41.796
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61,2018-11-15 05:44:09.796
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 07:48:55.796


In [28]:
# # create timestamp column from original timestamp column
# get_timestamp = udf(lambda ts: datetime.fromtimestamp(ts/1000).isoformat())
# df_logs = df_logs.withColumn('start_time', get_timestamp('ts').cast(TimestampType()))
    
# # # create datetime column from original timestamp column
# # get_datetime = udf()
# # df_logs = 

In [29]:
# extract columns to create time table
time_table = df_logs.select('start_time')
# list of functions
funcs = [F.hour, F.dayofmonth, F.weekofyear, F.month, F.year, F.dayofweek]
cols = ['hour', 'day', 'week', 'month', 'year', 'weekday']
time_cols = [(col, func('start_time'))for func, col in zip(funcs, cols)]
for col in time_cols:
    time_table = time_table.withColumn(*col)
    
print(time_table.count())
time_table.printSchema()
time_table.limit(5).toPandas() 

6820
root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



                                                                                

Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,2018-11-15 02:30:26.796,2,15,46,11,2018,5
1,2018-11-15 02:41:21.796,2,15,46,11,2018,5
2,2018-11-15 02:45:41.796,2,15,46,11,2018,5
3,2018-11-15 05:44:09.796,5,15,46,11,2018,5
4,2018-11-15 07:48:55.796,7,15,46,11,2018,5


In [30]:
# # extract columns to create time table
# time_table = df_logs.select('start_time')
# time_table = time_table.withColumn('hour', F.hour('start_time'))\
#                        .withColumn('day', F.dayofmonth('start_time'))\
#                        .withColumn('week', F.weekofyear('start_time'))\
#                        .withColumn('month', F.month('start_time'))\
#                        .withColumn('year', F.year('start_time'))\
#                        .withColumn('weekday', F.dayofweek('start_time'))

# print(time_table.count())
# time_table.printSchema()
# time_table.limit(5).toPandas() 

In [31]:
# # extract columns to create time table
# time_table = df_logs.select('start_time')
# time_table = time_table.withColumn('hour', F.hour('start_time'))
# time_table = time_table.withColumn('day', F.dayofmonth('start_time'))
# time_table = time_table.withColumn('week', F.weekofyear('start_time'))
# time_table = time_table.withColumn('month', F.month('start_time'))
# time_table = time_table.withColumn('year', F.year('start_time'))
# time_table = time_table.withColumn('weekday', F.dayofweek('start_time'))

# print(time_table.count())
# time_table.printSchema()
# time_table.limit(5).toPandas() 

In [32]:
# write time table to parquet files partitioned by year and month
time_table.write.partitionBy("year", "month").parquet("times", mode="overwrite")

                                                                                

<br>

## SongPlay Table

In [33]:
# # read in song data to use for songplays table
# song_df = spark.read.parquet("songs/*/*/*.parquet")

In [34]:
# extract columns from joined song and log datasets to create songplays table
df_logs = df_logs.orderBy('ts')
df_logs = df_logs.withColumn('songplay_id', F.monotonically_increasing_id())

df_songs.createOrReplaceTempView('songs')
df_logs.createOrReplaceTempView('events')

# include year and month to allow parquet partitioning
songplays_table = spark.sql("""
        SELECT
            e.songplay_id,
            e.start_time,
            e.userId as user_id,
            e.level,
            s.song_id,
            s.artist_id,
            e.sessionId as session_id,
            e.location,
            e.userAgent as user_agent,
            year(e.start_time) as year,
            month(e.start_time) as month
        FROM events e
        LEFT JOIN songs s ON
            e.song = s.title AND
            e.artist = s.artist_name AND
            ABS(e.length - s.duration) < 2
""")

In [35]:
# write songplays table to parquet files partitioned by year and month
songplays_table.write.parquet('./songplays', 
                              partitionBy=['year', 'month'],
                              mode="overwrite")

                                                                                

**A quick question to brainstorm here : how can you partition the data without including the partition columns in the dataset?** 
- the partition colums are not included in the parquet files, the arey only used for naming the files.

<br>

### RUN PYTHON SCRIPTS

In [47]:
# ### How to run the Python scripts
# We can run python scripts either:
# - in the notebook:
#     - %run -i '<filename>.py'
#     - !python <filename>.py

!python etl.py 

### (Using S3 with Local Mode!!)
### This is not recommended at all in production
### what we are doing now is loading the data all the way from amazon,
### and into the memory of jupyter Notebook Machine

22/10/12 04:14:29 WARN Utils: Your hostname, Mahmouds-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.2 instead (on interface en0)
22/10/12 04:14:29 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/Users/mnagy99/opt/anaconda3/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /Users/mnagy99/.ivy2/cache
The jars for the packages stored in: /Users/mnagy99/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-66559092-4991-4002-94c4-1fd5cc3edb29;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;2.7.0 in central
	found org.apache.hadoop#hadoop-common;2.7.0 in central
	found org.apache.hadoop#hadoop-annotations;2.7.0 in central
	found com.google.guava#guava;11.0.2 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found commons-cli

	0 artifacts copied, 68 already retrieved (0kB/48ms)
22/10/12 04:14:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/10/12 04:14:36 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
                                                                                

- pyspark.sql.utils.AnalysisException: Path does not exist: file:/Users/mnagy99/Desktop/DataLake Project/datasong_data/A/A/A/*.json
- pyspark.sql.utils.AnalysisException: Path does not exist: file:/Users/mnagy99/Desktop/DataLake Project/data/log_data/A/A/A.json

<br>

<br>