In [74]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, Row, ArrayType
from pyspark.sql import SQLContext
from pyspark.sql.functions import from_json, explode, col, udf
from pyspark.sql import functions as F
import requests
import os
from dotenv import load_dotenv
from datetime import datetime
import datetime
import json
from sqlalchemy import create_engine as ce
import pandas as pd
import pyspark.pandas as ps
import pymysql


In [6]:
spark = SparkSession.builder\
.master('local')\
.appName('SportifyTracks')\
.getOrCreate()
sql_context = SQLContext(spark)



In [None]:
spark.stop()

In [None]:
sc = SparkContext(master="local", appName="spotify-etl")

In [None]:
sc

In [55]:
load_dotenv()

True

In [60]:
def convert_date(date):
    dt = date.split('T')[0]
    return dt

In [82]:
def extract():
    df = spark.read.json('./tracks.json', multiLine=True)
    return df


def transform(df):
    dataframe = df.withColumn('col', explode('items'))\
    .withColumn('track_name', col('col.track.name'))\
    .withColumn('col2', explode('col.track.artists'))\
    .withColumn('artist', col('col2.name'))\
    .withColumn('played_at', col('col.played_at'))\
    .drop('cursors', 'href', 'limit', 'next', 'items', 'col','col2')
    return dataframe

In [83]:
df = extract()
df.printSchema()

root
 |-- cursors: struct (nullable = true)
 |    |-- after: string (nullable = true)
 |    |-- before: string (nullable = true)
 |-- href: string (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- context: struct (nullable = true)
 |    |    |    |-- external_urls: struct (nullable = true)
 |    |    |    |    |-- spotify: string (nullable = true)
 |    |    |    |-- href: string (nullable = true)
 |    |    |    |-- type: string (nullable = true)
 |    |    |    |-- uri: string (nullable = true)
 |    |    |-- played_at: string (nullable = true)
 |    |    |-- track: struct (nullable = true)
 |    |    |    |-- album: struct (nullable = true)
 |    |    |    |    |-- album_group: string (nullable = true)
 |    |    |    |    |-- album_type: string (nullable = true)
 |    |    |    |    |-- artists: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |   

In [84]:
convert_date_udf = udf(lambda z : convert_date(z), StringType())
sql_context.udf.register('convertGender', convert_date_udf)


df_transformed = transform(df)
df_transformed.show()
df_final = df_transformed.dropDuplicates(['track_name']).select('track_name', 'artist', convert_date_udf('played_at').alias('played_at')).sort('played_at')

+--------------------+--------------------+--------------------+
|          track_name|              artist|           played_at|
+--------------------+--------------------+--------------------+
|       Hand Of Blood|Bullet For My Val...|2023-04-04T22:28:...|
|    Waking the Demon|Bullet For My Val...|2023-04-04T22:21:...|
|My Own Summer (Sh...|            Deftones|2023-04-04T22:17:...|
|Not The American ...|   Asking Alexandria|2023-04-04T22:10:...|
|       My Apocalypse|     Escape the Fate|2023-04-04T22:02:...|
|Be Quiet and Driv...|            Deftones|2023-04-04T21:57:...|
|My Own Summer (Sh...|            Deftones|2023-04-04T21:52:...|
|My Own Summer (Sh...|            Deftones|2023-04-04T21:45:...|
|           Vermilion|            Slipknot|2023-04-04T21:38:...|
|     Scream Aim Fire|Bullet For My Val...|2023-04-04T21:21:...|
|The Final Episode...|   Asking Alexandria|2023-04-04T21:16:...|
|Immaculate Miscon...| Motionless In White|2023-04-04T21:12:...|
|    Waking the Demon|Bul

In [72]:
url = 'mysql+pymysql://root:admin@172.17.0.3/my_tracks'
engine = ce(url, echo=True)
connection = engine.connect()

2023-04-11 00:57:32,497 INFO sqlalchemy.engine.Engine SELECT DATABASE()
2023-04-11 00:57:32,499 INFO sqlalchemy.engine.Engine [raw sql] {}
2023-04-11 00:57:32,504 INFO sqlalchemy.engine.Engine SELECT @@sql_mode
2023-04-11 00:57:32,505 INFO sqlalchemy.engine.Engine [raw sql] {}
2023-04-11 00:57:32,507 INFO sqlalchemy.engine.Engine SELECT @@lower_case_table_names
2023-04-11 00:57:32,508 INFO sqlalchemy.engine.Engine [raw sql] {}


In [77]:
tracks_df = df_final.toPandas()


In [78]:

try:
    tracks_df.to_sql("my_tracks", engine, index=False, if_exists='append')
except:
     print("Data already exists in the database")

connection.close()
print("Close database successfully")
    

2023-04-11 01:04:14,603 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2023-04-11 01:04:14,606 INFO sqlalchemy.engine.Engine DESCRIBE `my_tracks`.`my_tracks`
2023-04-11 01:04:14,607 INFO sqlalchemy.engine.Engine [raw sql] {}
2023-04-11 01:04:14,634 INFO sqlalchemy.engine.Engine ROLLBACK
2023-04-11 01:04:14,636 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2023-04-11 01:04:14,639 INFO sqlalchemy.engine.Engine 
CREATE TABLE my_tracks (
	track_name TEXT, 
	artist TEXT, 
	played_at TEXT
)


2023-04-11 01:04:14,640 INFO sqlalchemy.engine.Engine [no key 0.00084s] {}
2023-04-11 01:04:14,706 INFO sqlalchemy.engine.Engine COMMIT
2023-04-11 01:04:14,707 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2023-04-11 01:04:14,710 INFO sqlalchemy.engine.Engine INSERT INTO my_tracks (track_name, artist, played_at) VALUES (%(track_name)s, %(artist)s, %(played_at)s)
2023-04-11 01:04:14,711 INFO sqlalchemy.engine.Engine [generated in 0.00064s] [{'track_name': 'Diles', 'artist': 'Bad Bunny', 'played_at': '2

In [53]:
def mysqlconnect():
    # To connect MySQL database
    conn = pymysql.connect(
        host='172.17.0.3',
        user='root', 
        password = "admin",
        db='my_tracks',
        )
      
    cur = conn.cursor()
      
    # Select query
    cur.execute("select * from tracks")
    output = cur.fetchall()
      
    for i in output:
        print(i)
      
    # To close the connection
    conn.close()

In [54]:
mysqlconnect()

('a', 'b', datetime.date(2023, 1, 2))
