In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.postgresql:postgresql:42.1.1 pyspark-shell'
import pyspark
import pandas as pd
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

we have imported the dependecies required to run the pyspark code

In [2]:
sc = SparkContext("local", "first app")
spark = SparkSession.builder.master("local").appName("Spark ETL Pipeline").getOrCreate()

 create a sparkcontext object

In [3]:
logdata = spark.read.json("jsondata/log_data/*/*/*.json")
songdata = spark.read.json("jsondata/song_data/*/*/*/*.json")



here we have loaded the data from the respective folders into the Dataframe.

### process Songdata

In [4]:
song_data = songdata[['song_id','title','artist_id', 'year', 'duration']]


get the required columns of dataframe as per the schema of table in database.

In [5]:
url = "jdbc:postgresql://127.0.0.1/etl"
properties = {
    "driver": "org.postgresql.Driver",
    "user": "etl",
    "password": "etl"
}

set the properties with the parameters required for jdbc

In [6]:
song_data_db = spark.read.jdbc(url=url,\
    table="songs", \
    properties=properties)
song_data_db.show(5)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOGOSOV12AF72A285E|   ¿Dónde va Chichi?|ARGUVEV1187B98BA17|1997|313.12934|
|SOMZWCG12A8C13C480|    I Didn't Mean To|ARD7TVE1187B99BFB1|   0|218.93179|
|SOUPIRU12A6D4FA1E1| Der Kleine Dompfaff|ARJIE2Y1187B994AB7|   0|152.92036|
|SOXVLOJ12AB0189215|     Amor De Cabaret|ARKRRTF1187B9984DA|   0|177.47546|
|SOWTBJW12AC468AC6E|Broken-Down Merry...|ARQGYP71187FB44566|   0|151.84934|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



In [7]:
song_data_db.registerTempTable("songsdb")
song_data.registerTempTable("songsnew")
song_insert = spark.sql("select n.* from songsnew n left outer join songsdb d on d.song_id = n.song_id where d.song_id is null")
song_insert.show(10)

+-------+-----+---------+----+--------+
|song_id|title|artist_id|year|duration|
+-------+-----+---------+----+--------+
+-------+-----+---------+----+--------+



In [8]:
song_insert.write.jdbc(url=url, table='songs',mode='append',properties=properties)

### process artist data

In [9]:
artist_data = songdata[['artist_id','artist_name','artist_location', 'artist_latitude', 'artist_longitude']]
artist_data.show(5)

+------------------+--------------------+--------------------+---------------+----------------+
|         artist_id|         artist_name|     artist_location|artist_latitude|artist_longitude|
+------------------+--------------------+--------------------+---------------+----------------+
|ARDR4AC1187FB371A1|Montserrat Caball...|                    |           null|            null|
|AREBBGV1187FB523D2|Mike Jones (Featu...|         Houston, TX|           null|            null|
|ARMAC4T1187FB3FA4C|The Dillinger Esc...|   Morris Plains, NJ|       40.82624|       -74.47995|
|ARPBNLO1187FB3D52F|            Tiny Tim|        New York, NY|       40.71455|       -74.00712|
|ARNF6401187FB57032|   Sophie B. Hawkins|New York, NY [Man...|       40.79086|       -73.96644|
+------------------+--------------------+--------------------+---------------+----------------+
only showing top 5 rows



In [10]:
artist_db = spark.read.jdbc(url=url,\
    table="artists", \
    properties=properties)
artist_db.show(5)

+---------+-----------+---------------+---------------+----------------+
|artist_id|artist_name|artist_location|artist_latitude|artist_longitude|
+---------+-----------+---------------+---------------+----------------+
+---------+-----------+---------------+---------------+----------------+



In [11]:
artist_data.registerTempTable("artist")
artist_db.registerTempTable("artist_db")
artist_insert = spark.sql("select distinct a.* from artist a left outer join artist_db d on a.artist_id= d.artist_id where d.artist_id is null")
artist_insert.show(5)

+------------------+--------------------+--------------------+---------------+----------------+
|         artist_id|         artist_name|     artist_location|artist_latitude|artist_longitude|
+------------------+--------------------+--------------------+---------------+----------------+
|AR9AWNF1187B9AB0B4|Kenny G featuring...|Seattle, Washingt...|           null|            null|
|AR0IAWL1187B9A96D0|        Danilo Perez|              Panama|         8.4177|       -80.11278|
|AR0RCMP1187FB3F427|    Billie Jo Spears|        Beaumont, TX|       30.08615|       -94.10158|
|AREDL271187FB40F44|        Soul Mekanik|                    |           null|            null|
|ARI3BMM1187FB4255E|        Alice Stuart|          Washington|        38.8991|         -77.029|
+------------------+--------------------+--------------------+---------------+----------------+
only showing top 5 rows



In [120]:
artist_insert.write.jdbc(url=url, table='artists',mode='append',properties=properties)

### process time data

In [108]:
logdata = logdata[logdata['page'] == 'NextSong']
logdata.show(4)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|The Big Gundown|

In [109]:
logdata.registerTempTable("log_data")
time_data = spark.sql("select FROM_UNIXTIME(ts,'yyyy-MM-dd HH:mm:ss.sss') as start_time,hour(from_unixtime(ts/1000)) as hour,day(from_unixtime(ts/1000)) as day,weekofyear(from_unixtime(ts/1000))  as week,month(from_unixtime(ts/1000)) as month,year(from_unixtime(ts/1000)) as year,weekday(from_unixtime(ts/1000)) as weekday from log_data")
time_data.show(5)

+--------------------+----+---+----+-----+----+-------+
|          start_time|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|50841-09-11 20:26...|  16| 14|  46|   11|2018|      2|
|50841-09-19 10:23...|  16| 14|  46|   11|2018|      2|
|50841-09-22 10:36...|  16| 14|  46|   11|2018|      2|
|50842-01-24 08:03...|  19| 14|  46|   11|2018|      2|
|50842-04-21 00:29...|  21| 14|  46|   11|2018|      2|
+--------------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [110]:
time_db = spark.read.jdbc(url=url,\
    table="time", \
    properties=properties)
time_db.show(5)

+----------+----+---+----+-----+----+-------+
|start_time|hour|day|week|month|year|weekday|
+----------+----+---+----+-----+----+-------+
+----------+----+---+----+-----+----+-------+



In [111]:
time_db.registerTempTable("timedb")
time_data.registerTempTable("time")
time_insert = spark.sql("select distinct t.* from time t left outer join timedb d on d.start_time = t.start_time where d.hour is null")
time_insert

DataFrame[start_time: string, hour: int, day: int, week: int, month: int, year: int, weekday: int]

In [112]:
time_insert.write.jdbc(url=url, table='time',mode='append',properties=properties)

### process user data

In [113]:
user_df = logdata[['userId', 'firstName', 'lastName', 'gender', 'level']]

user_df= user_df.select(col("userId").alias("user_id").cast(IntegerType()),col("firstname").alias("first_name"),col("lastName").alias("last_name"),col("gender"),col("level"))
user_df

DataFrame[user_id: int, first_name: string, last_name: string, gender: string, level: string]

In [114]:
user_db = spark.read.jdbc(url=url,\
    table="users", \
    properties=properties)
user_db.show(5)
user_db

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
+-------+----------+---------+------+-----+



DataFrame[user_id: int, first_name: string, last_name: string, gender: string, level: string]

In [115]:
user_df.registerTempTable("users")
user_db.registerTempTable("users_db")
user_insert = spark.sql("select distinct u.* from users u left outer join users_db d on u.user_id= d.user_id where d.user_id is null")
user_insert.show(5)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     85|   Kinsley|    Young|     F| paid|
|     85|   Kinsley|    Young|     F| free|
|     65|     Amiya| Davidson|     F| paid|
|     53|   Celeste| Williams|     F| free|
|     78|     Chloe|     Roth|     F| free|
+-------+----------+---------+------+-----+
only showing top 5 rows



In [116]:
user_insert.write.jdbc(url=url, table='users',mode='append',properties=properties)