# Project Data Lake with Spark and S3

In [42]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import col, desc
import os
import configparser

In [4]:
config = configparser.ConfigParser()

config.read_file(open('dl.cfg'))

os.environ["AWS_ACCESS_KEY_ID"]= config['default']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['default']['AWS_SECRET_ACCESS_KEY']

# Create spark session with hadoop-aws package

In [5]:
configure = SparkConf().setAppName("app name").setMaster("local")
sc = SparkContext(conf = configure)
spark = SparkSession.builder\
                    .config('fs.s3a.access.key', os.environ['AWS_ACCESS_KEY_ID']) \
                    .config('fs.s3a.secret.key', os.environ['AWS_SECRET_ACCESS_KEY']) \
                    .appName("app name")\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.3")\
                     .getOrCreate()                   

# Analysis

In [6]:
output_data = "data/output/"
song_df = spark.read\
        .format("parquet")\
        .option("basePath", os.path.join(output_data, "songs/"))\
        .load(os.path.join(output_data, "songs/*/*/"))


In [7]:
song_df.printSchema()
song_df.show(3)

root
 |-- song_id: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- artist_id: string (nullable = true)

+------------------+---------+--------------------+----+------------------+
|           song_id| duration|               title|year|         artist_id|
+------------------+---------+--------------------+----+------------------+
|SOAOIBZ12AB01815BE| 43.36281|I Hold Your Hand ...|2000|ARPBNLO1187FB3D52F|
|SONYPOM12A8C13B2D7|186.48771|I Think My Wife I...|2005|ARDNS031187B9924F0|
|SODREIN12A58A7F2E5|326.00771|A Whiter Shade Of...|   0|ARLTWXK1187FB5A3F8|
+------------------+---------+--------------------+----+------------------+
only showing top 3 rows



In [8]:
output_data = "data/output/"
artists_df = spark.read\
        .format("parquet")\
        .option("basePath", os.path.join(output_data, "artists/"))\
        .load(os.path.join(output_data, "artists/*"))

In [9]:
artists_df.printSchema()
artists_df.show(3)

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- year: long (nullable = true)
 |-- num_songs: long (nullable = true)

+------------------+---------------+----------------+-----------------+--------------------+----+---------+
|         artist_id|artist_latitude|artist_longitude|  artist_location|         artist_name|year|num_songs|
+------------------+---------------+----------------+-----------------+--------------------+----+---------+
|ARDR4AC1187FB371A1|           null|            null|                 |Montserrat Caball...|   0|        1|
|AREBBGV1187FB523D2|           null|            null|      Houston, TX|Mike Jones (Featu...|   0|        1|
|ARMAC4T1187FB3FA4C|       40.82624|       -74.47995|Morris Plains, NJ|The Dillinger Esc...|2004|        1|
+------------------+---------------+-----------

In [10]:
output_data = "data/output/"
songplays_df = spark.read\
        .format("parquet")\
        .option("basePath", os.path.join(output_data, "songplays/"))\
        .load(os.path.join(output_data, "songplays/*"))
songplays_df.printSchema()
songplays_df.show(3)

root
 |-- start_time: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)

+--------------------+-------+-----+-------+---------+----------+--------------------+--------------------+----+-----+
|          start_time|user_id|level|song_id|artist_id|session_id|            location|          user_agent|year|month|
+--------------------+-------+-----+-------+---------+----------+--------------------+--------------------+----+-----+
|2018-11-14 22:30:...|     26| free|   null|     null|       583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|2018|   11|
|2018-11-14 22:41:...|     26| free|   null|     null|       583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|2018|   11|
|

In [31]:
song_df = song_df.alias("s")
artists_df = artists_df.alias("a")
songplays_df = songplays_df.alias("sp")
sparkify_table = songplays_df.join(
        song_df, 
        songplays_df.song_id == song_df.song_id, 
        'left_outer')\
        .join(
            artists_df,
            artists_df.artist_id == artists_df.artist_id, 'left_outer')
        #.select(col('s.year').alias('s_year'))

In [30]:
sparkify_table.printSchema()
sparkify_table.show(1)

root
 |-- s_year: integer (nullable = true)

+------+
|s_year|
+------+
|  null|
+------+
only showing top 1 row



In [36]:
songs_in_hour = sparkify_table.filter(col('sp.year') == 2018).select(col('a.artist_name'))
songs_in_hour.show()

+--------------------+
|         artist_name|
+--------------------+
|Montserrat Caball...|
|Mike Jones (Featu...|
|The Dillinger Esc...|
|            Tiny Tim|
|          Tim Wilson|
|   Sophie B. Hawkins|
|         King Curtis|
|         Lupe Fiasco|
|Nick Ingman;Gavyn...|
|         Willie Bobo|
|    Billie Jo Spears|
|Kenny G featuring...|
|        SUE THOMPSON|
|Jeff And Sheri Ea...|
|Tweeterfriendly M...|
|       Terry Callier|
|        Jimmy Wakely|
|        David Martin|
|      Bombay Rockers|
|           Tom Petty|
+--------------------+
only showing top 20 rows



In [43]:
top_users = sparkify_table.groupby(col('sp.user_id')).count().orderBy(desc('count'))
top_users.show()

+-------+-----+
|user_id|count|
+-------+-----+
|     49|48919|
|     80|47215|
|     97|39547|
|     15|32873|
|     44|28187|
|     29|24566|
|     24|22791|
|     73|20519|
|     88|19170|
|     36|17608|
|     16|15833|
|     95|15123|
|     85|12709|
|     30|12638|
|     25|11999|
|     58| 9940|
|     42| 9940|
|     26| 8094|
|     82| 6177|
|     72| 5112|
+-------+-----+
only showing top 20 rows



In [44]:
output_data = "data/output/"
users_df = spark.read\
        .format("parquet")\
        .option("basePath", os.path.join(output_data, "users/"))\
        .load(os.path.join(output_data, "users/*"))
users_df.printSchema()
users_df.show(3)

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     88|  Mohammad|Rodriguez|     M| free|
|     88|  Mohammad|Rodriguez|     M| paid|
|     75|    Joseph|Gutierrez|     M| free|
+-------+----------+---------+------+-----+
only showing top 3 rows



In [49]:
top_user_data = top_users.join(
    users_df,
    users_df.user_id == top_users.user_id,
    'inner'
).dropDuplicates()

In [51]:
top_user_data.orderBy(desc('count')).show()

+-------+-----+-------+----------+---------+------+-----+
|user_id|count|user_id|first_name|last_name|gender|level|
+-------+-----+-------+----------+---------+------+-----+
|     49|48919|     49|     Chloe|   Cuevas|     F| free|
|     49|48919|     49|     Chloe|   Cuevas|     F| paid|
|     80|47215|     80|     Tegan|   Levine|     F| free|
|     80|47215|     80|     Tegan|   Levine|     F| paid|
|     97|39547|     97|      Kate|  Harrell|     F| paid|
|     15|32873|     15|      Lily|     Koch|     F| paid|
|     15|32873|     15|      Lily|     Koch|     F| free|
|     44|28187|     44|    Aleena|    Kirby|     F| paid|
|     29|24566|     29|Jacqueline|    Lynch|     F| free|
|     29|24566|     29|Jacqueline|    Lynch|     F| paid|
|     24|22791|     24|     Layla|  Griffin|     F| paid|
|     73|20519|     73|     Jacob|    Klein|     M| paid|
|     88|19170|     88|  Mohammad|Rodriguez|     M| free|
|     88|19170|     88|  Mohammad|Rodriguez|     M| paid|
|     36|17608