In [1]:
import findspark
findspark.init('/home/nikhil/spark-2.4.4-bin-hadoop2.7/')
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('sql').getOrCreate()


In [2]:
user_log=spark.read.json('data/sparkify_log_small.json')

In [3]:
user_log.take(1)

[Row(artist='Showaddywaddy', auth='Logged In', firstName='Kenneth', gender='M', itemInSession=112, lastName='Matthews', length=232.93342, level='paid', location='Charlotte-Concord-Gastonia, NC-SC', method='PUT', page='NextSong', registration=1509380319284, sessionId=5132, song='Christmas Tears Will Fall', status=200, ts=1513720872284, userAgent='"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='1046')]

## Create a view for the sql
###### It is the temp view if we end the session the createTempview does't work because temp view

In [15]:
user_log.createOrReplaceTempView("user_log_table")

In [17]:
spark.sql("SELECT * FROM user_log_table limit 2").show()

+-------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+
|       artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page| registration|sessionId|                song|status|           ts|           userAgent|userId|
+-------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+
|Showaddywaddy|Logged In|  Kenneth|     M|          112|Matthews|232.93342| paid|Charlotte-Concord...|   PUT|NextSong|1509380319284|     5132|Christmas Tears W...|   200|1513720872284|"Mozilla/5.0 (Win...|  1046|
|   Lily Allen|Logged In|Elizabeth|     F|            7|   Chase|195.23873| free|Shreveport-Bossie...|   PUT|NextSong|1512718541284|     5027|      

In [19]:
spark.sql('''
select count(*) from user_log_table''').show()

+--------+
|count(1)|
+--------+
|   10000|
+--------+



In [21]:
spark.sql('''
select userID,firstname,page,song
from user_log_table where userID=='1046'
''').show()

+------+---------+--------+--------------------+
|userID|firstname|    page|                song|
+------+---------+--------+--------------------+
|  1046|  Kenneth|NextSong|Christmas Tears W...|
|  1046|  Kenneth|NextSong|  Be Wary Of A Woman|
|  1046|  Kenneth|NextSong|   Public Enemy No.1|
|  1046|  Kenneth|NextSong|Reign Of The Tyrants|
|  1046|  Kenneth|NextSong|      Father And Son|
|  1046|  Kenneth|NextSong|               No. 5|
|  1046|  Kenneth|NextSong|           Seventeen|
|  1046|  Kenneth|    Home|                null|
|  1046|  Kenneth|NextSong|          War on war|
|  1046|  Kenneth|NextSong|   Killermont Street|
|  1046|  Kenneth|NextSong|        Black & Blue|
|  1046|  Kenneth|  Logout|                null|
|  1046|  Kenneth|    Home|                null|
|  1046|  Kenneth|NextSong|     Heads Will Roll|
|  1046|  Kenneth|NextSong|Bleed It Out [Liv...|
|  1046|  Kenneth|NextSong|              Clocks|
|  1046|  Kenneth|NextSong|           Love Rain|
|  1046|  Kenneth|Ne

## User Defined Functions

In [23]:
from pyspark.sql.functions import udf
import datetime

In [39]:
spark.udf.register('get_hour', lambda x: int(datetime.datetime.fromtimestamp(x / 1000.0).hour))

<function __main__.<lambda>(x)>

In [40]:
spark.sql('''
select *,get_hour(ts) as hour
from user_log_table
limit 1
''').collect()

[Row(artist='Showaddywaddy', auth='Logged In', firstName='Kenneth', gender='M', itemInSession=112, lastName='Matthews', length=232.93342, level='paid', location='Charlotte-Concord-Gastonia, NC-SC', method='PUT', page='NextSong', registration=1509380319284, sessionId=5132, song='Christmas Tears Will Fall', status=200, ts=1513720872284, userAgent='"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='1046', hour='23')]

In [41]:
songs_in_hour=spark.sql('''
select get_hour(ts) as hour,count(*) as plays_per_hour
from user_log_table 
where page="NextSong"
group by hour
order by cast(hour as int) asc
''')

In [42]:
songs_in_hour.show()

+----+--------------+
|hour|plays_per_hour|
+----+--------------+
|   0|           375|
|   1|           456|
|   2|           454|
|   3|           382|
|   4|           302|
|   5|           352|
|   6|           276|
|   7|           348|
|   8|           358|
|   9|           375|
|  10|           249|
|  11|           216|
|  12|           228|
|  13|           251|
|  14|           339|
|  15|           462|
|  16|           479|
|  17|           484|
|  18|           430|
|  19|           362|
+----+--------------+
only showing top 20 rows



## Convert the result to pandas dataframe

In [43]:
songs_in_hour_pd=songs_in_hour.toPandas()

In [44]:
print(songs_in_hour_pd)

   hour  plays_per_hour
0     0             375
1     1             456
2     2             454
3     3             382
4     4             302
5     5             352
6     6             276
7     7             348
8     8             358
9     9             375
10   10             249
11   11             216
12   12             228
13   13             251
14   14             339
15   15             462
16   16             479
17   17             484
18   18             430
19   19             362
20   20             295
21   21             257
22   22             248
23   23             369
