## Get a billboard of top 20 songs for the current day.
This Jupyter code will do the following steps:
1. Connect to top_tracks table in PostgreSQL
2. Perform data transformation and extract a billboard of top 20 songs for the current day.
3. Print the top 20 songs billboard as a table

### 1. Connect to PostgreSQL database

In [1]:
import findspark
findspark.init()
from pyspark import SparkConf
from pyspark.sql import SparkSession

# PostgreSQL database information
url = "jdbc:postgresql://localhost:5432/openapi"
table_name = 'top_tracks'
schema = 'lastfm'
user_name = 'api1'
password = 'Open API@1'
driver = "org.postgresql.Driver"

# pyspark parameters for PostgreSQL connection
appName = "PostgreSQL Connection"
master = "local[4]" # Use 4 cores
#jdbc_jar = "D:\\Spark\spark-3.0.1-bin-hadoop3.2\\jars\\postgresql-42.2.18.jar"
conf = SparkConf() \
    .setAppName(appName) \
    .setMaster(master)

# Create a local session
spark = SparkSession.builder \
    .config(conf=SparkConf()) \
    .getOrCreate()

df = spark.read \
    .format("jdbc") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("currentSchema", schema) \
    .option("user", user_name) \
    .option("password", password) \
    .option("driver", driver) \
    .load()

# Print table schema
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- playcount: integer (nullable = true)
 |-- listeners: integer (nullable = true)
 |-- mbid: string (nullable = true)
 |-- extract_date: date (nullable = true)
 |-- url: string (nullable = true)



### Extract a billboard of top 20 songs for the current day

In [2]:
import datetime
from pyspark.sql.functions import col
today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)

df_today = df.selectExpr('name', 'listeners AS today_listeners', 'playcount AS today_playcount', 'mbid')\
    .where((col('extract_date') == today))
df_yesterday = df.selectExpr('name', 'listeners AS yesterday_listeners', 'playcount AS yesterday_playcount', 'mbid')\
    .where((col('extract_date') == yesterday))
print("Today is: {}".format(today))

Today is: 2020-10-25


In [4]:
df_today.orderBy(col('playcount').desc()).show(5)

+-------------------+---------------+---------------+--------------------+
|               name|today_listeners|today_playcount|                mbid|
+-------------------+---------------+---------------+--------------------+
|              Intro|        1167976|       14057220|773a1557-a974-4f1...|
|         Blue Jeans|         814411|       12805647|e5977c09-8f5a-46b...|
|When You Were Young|        1447917|       11681896|b38f9ded-8ac3-409...|
|           Teardrop|        1363538|       11431588|04a4ac3e-5439-421...|
|          I'm Yours|        1366957|       11029433|a5a2330e-2fff-460...|
+-------------------+---------------+---------------+--------------------+
only showing top 5 rows



In [5]:
df_yesterday.orderBy(col('playcount').desc()).show(5)

+-------------------+-------------------+-------------------+--------------------+
|               name|yesterday_listeners|yesterday_playcount|                mbid|
+-------------------+-------------------+-------------------+--------------------+
|         Wonderwall|            1948334|           16190135|31623cce-9717-451...|
|              Intro|            1167825|           14055147|773a1557-a974-4f1...|
|         Blue Jeans|             814295|           12804139|e5977c09-8f5a-46b...|
|When You Were Young|            1447724|           11679670|b38f9ded-8ac3-409...|
|           Teardrop|            1363434|           11429903|04a4ac3e-5439-421...|
+-------------------+-------------------+-------------------+--------------------+
only showing top 5 rows



In [6]:
count_join = df_today.join(df_yesterday,
    (df_today.mbid == df_yesterday.mbid) & (df_today.name == df_yesterday.name))\
    .select(df_today.name, 'today_playcount', 'yesterday_playcount', 'today_listeners', 
            'yesterday_listeners', df_today.mbid)
# Add diff column = today_playcount - yesterday_playcount
count_join = count_join.withColumn("playcount_diff", count_join.today_playcount - count_join.yesterday_playcount)
count_join = count_join.orderBy(col('today_playcount').desc())
top_20 = count_join.select('name', 'today_playcount', 'playcount_diff')\
    .orderBy(col('today_playcount').desc())\
    .head(20)

In [7]:
from pyspark.sql.types import *
new_schema = StructType([
    StructField('name', StringType(), True),
    StructField('today_playcount', IntegerType(), True),
    StructField('playcount_diff', IntegerType(), True)
])
top_20 = spark.createDataFrame(top_20, new_schema)
top_20.show()

+--------------------+---------------+--------------+
|                name|today_playcount|playcount_diff|
+--------------------+---------------+--------------+
|               Intro|       14057220|          2073|
|          Blue Jeans|       12805647|          1508|
| When You Were Young|       11681896|          2226|
|            Teardrop|       11431588|          1685|
|           I'm Yours|       11029433|          1535|
|    Someone Like You|       10881205|          2559|
|             Lithium|       10537498|          2299|
|           Alejandro|       10014817|          1924|
|Set Fire to the Rain|        9746955|          2260|
|Jigsaw Falling In...|        9476616|          1921|
|            Paradise|        9311973|          1639|
|    Comfortably Numb|        9113468|          1390|
|   Sweet Disposition|        9041639|          1596|
|               Faint|        8996640|          1589|
|All These Things ...|        8945406|          1217|
|     I Kissed a Girl|      

In [8]:
# Save top_20 to file
csv_file = 'data/top-20-daily/' + str(today) + '.csv'
top_20.toPandas().to_csv(csv_file, index=False)