In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, desc, sum as Fsum, row_number, to_timestamp, from_unixtime, trim, lower
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import StructType as R, StructField as Fld, StringType as Str, IntegerType as Int, DoubleType as Dbl, TimestampType

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config.get('AWS','AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('AWS','AWS_SECRET_ACCESS_KEY')

In [5]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.5") \
        .getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions",5)

In [4]:
df_song = spark.read.json('s3a://udacity-dend/song_data/A/A/A')

In [5]:
df_song.show(5)

+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARTC1LV1187B9A4858|        51.4536|Goldsmith's Colle...|        -0.01802|  The Bonzo Dog Band|301.40036|        1|SOAFBCP12A8C13CC7D|King Of Scurf (20...|1972|
|ARA23XO1187B9AF18F|       40.57885|Carteret, New Jersey|       -74.21956|     The Smithereens|  192.522|        1|SOKTJDS12AF72A25E5|Drown In My Own T...|   0|
|ARSVTNL1187B992A91|       51.50632|     London, England|        -0.12714|       Jonathan King|129.85424|        1|SOEKAZG12AB018837E|I'll Slap Your Fa...|2001|
|AR73AIO1187B9AD57B|       37.7791

In [6]:
df_song.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [7]:
song_columns = ['song_id', 'title', 'artist_id', 'year', 'duration']
song_data = df_song[song_columns].dropDuplicates()
song_data.count()

24

In [8]:
artist_columns = ['artist_id', 'artist_name', 'artist_location as location', 'artist_latitude as latitude', 'artist_longitude as longitude']
artist_data = df_song.selectExpr(artist_columns) \
                .dropDuplicates()
artist_data.count()

24

In [9]:
schema_json=spark.read.json("s3a://udacity-dend/log_json_path.json").schema
#newSchema=StructType.jsonValue(schema_json)#.asInstanceOf[StructType]
#schemaSource = Source.fromFile('s3://udacity-dend/log_json_path.json').getLines.mkString

In [10]:
LogDataSchema = R([
Fld("artist",Str()),
Fld("auth",Str()),
Fld("firstName",Str()),
Fld("gender",Str()),
Fld("itemInSession",Int()),
Fld("lastName",Str()),
Fld("length",Dbl()),
Fld("level",Str()),
Fld("location",Str()),
Fld("method",Str()),
Fld("page",Str()),
Fld("registration",Dbl()),
Fld("sessionId",Int()),
Fld("song",Str()),
Fld("status",Int()),
Fld("ts",Int()),
Fld("userAgent",Str()),
Fld("userId",Str()),
])

In [4]:
log_df = spark.read.json('s3a://udacity-dend/log-data/*/*/*.json').filter("page == 'NextSong'")#,multiLine=True)#, schema = LogDataSchema)#)
#                   .option("schema","s3://udacity-dend/log_json_path.json") \

In [5]:
log_df.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      

In [6]:
log_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [7]:
user_cols = ['userId as user_id', 'firstName as first_name', 'lastName as last_name', 'gender', 'level']

In [8]:
window = Window.partitionBy('userId').orderBy(desc('ts'))

In [9]:
#func = udf(lambda x: int(x == "paid"), Int())
log_df = log_df.withColumn('LastLevel', row_number().over(window)).cache()

In [10]:
user = log_df.filter(col('LastLevel')==1).orderBy(desc('ts')) \
      .selectExpr(*user_cols) \
      .dropDuplicates().show(5)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     26|      Ryan|    Smith|     M| free|
|      7|    Adelyn|   Jordan|     F| free|
|     71|    Ayleen|     Wise|     F| free|
|     81|    Sienna|    Colon|     F| free|
|     23|    Morris|  Gilmore|     M| free|
+-------+----------+---------+------+-----+
only showing top 5 rows



In [11]:
log_df.filter(col('LastLevel')==1).orderBy(desc('ts')) \
      .selectExpr(*user_cols) \
      .dropDuplicates().count()

96

In [12]:
log_df.select('userId').dropDuplicates().count()

96

In [13]:
songplay_cols =['start_time', 'userId as user_id', 'level', 'song_id', 'artist_id', 'sessionId as session_id', 'location', 'userAgent as user_agent']

In [14]:
songplay = log_df.withColumn('start_time', from_unixtime(col('ts')/1000)) \
      .join(df_song, on=[lower(trim(log_df.song)) == lower(trim(df_song.title)), lower(trim(log_df.artist)) == lower(trim(df_song.artist_name))], how='left') \
      .selectExpr(songplay_cols)

NameError: name 'df_song' is not defined

In [15]:
time = log_df.withColumn('start_time', to_timestamp(col('ts')/1000)) \
             .withColumn('hour', F.hour('start_time')) \
             .withColumn('day', F.dayofmonth('start_time')) \
             .withColumn('week', F.weekofyear('start_time')) \
             .withColumn('month', F.month('start_time')) \
             .withColumn('year', F.year('start_time')) \
             .withColumn('weekday', F.dayofweek('start_time')) \
             .dropDuplicates() \
             .select(['start_time', 'hour', 'day', 'week', 'month', 'year', 'weekday'])

In [16]:
time.show()

+--------------------+----+---+----+-----+----+-------+
|          start_time|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|2018-11-20 00:32:...|   0| 20|  47|   11|2018|      3|
|2018-11-15 16:36:...|  16| 15|  46|   11|2018|      5|
|2018-11-15 16:33:...|  16| 15|  46|   11|2018|      5|
|2018-11-15 16:31:...|  16| 15|  46|   11|2018|      5|
|2018-11-13 01:12:...|   1| 13|  46|   11|2018|      3|
|2018-11-12 17:24:...|  17| 12|  46|   11|2018|      2|
|2018-11-12 14:29:...|  14| 12|  46|   11|2018|      2|
|2018-11-12 14:25:...|  14| 12|  46|   11|2018|      2|
|2018-11-12 14:20:...|  14| 12|  46|   11|2018|      2|
|2018-11-12 14:16:...|  14| 12|  46|   11|2018|      2|
|2018-11-12 14:09:...|  14| 12|  46|   11|2018|      2|
|2018-11-12 14:06:...|  14| 12|  46|   11|2018|      2|
|2018-11-12 14:02:...|  14| 12|  46|   11|2018|      2|
|2018-11-04 06:38:...|   6|  4|  44|   11|2018|      1|
|2018-11-04 06:34:...|   6|  4|  44|   11|2018| 

In [24]:
spark.sparkContext \
     ._jsc.hadoopConfiguration().set("fs.s3a.access.key", config.get('AWS','AWS_ACCESS_KEY_ID'))
spark.sparkContext \
     ._jsc.hadoopConfiguration().set("fs.s3a.secret.key", config.get('AWS','AWS_SECRET_ACCESS_KEY'))
spark.sparkContext \
      ._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.amazonaws.com")
time.write.parquet('s3a://neela-bucket-1/time/time.parquet')

In [25]:
datetime.now()

datetime.datetime(2020, 5, 2, 21, 23, 43, 747598)