In [0]:
%spark.pyspark
spark.version


In [1]:
%spark.pyspark
sc

In [2]:
%spark.pyspark
sc.version

In [3]:
%spark.pyspark

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col, expr
from pyspark.sql.types import StructType

In [4]:
%spark.pyspark
# show streaming history 0,1 data type

# Sparksession
spark = SparkSession.builder.getOrCreate()

PATH0="/user/mijin/spotify/src/json/StreamingHistory0.json"
PATH1="/user/mijin/spotify/src/json/StreamingHistory1.json"

df0 = spark.read.option("multiline","true").json(PATH0) # !!!!!!!!!!
df0.createOrReplaceTempView("df_streaming0")
df0.printSchema()

df1 = spark.read.option("multiline", "true").json(PATH1) # !!!!!!!!
df1.createOrReplaceTempView("df_streaming1")
df1.printSchema()



In [5]:
%spark.pyspark
# 1)merge streaming dataframes
spark.sql("SELECT * FROM df_streaming0 UNION SELECT * FROM df_streaming1").createOrReplaceTempView("df_stream")
print("done")

In [6]:
%spark.sql
-- see how many columns in the table
SELECT COUNT(*) FROM df_stream

In [7]:
%spark.sql
-- create UniqueID column (key)
SELECT *, CONCAT(artistName, ':', trackName) AS UniqueID 
FROM df_stream
LIMIT 10;

In [8]:
%spark.pyspark
# 2) Creating table included UniqueID
spark.sql("SELECT *, CONCAT(artistName, ':', trackName) AS UniqueID FROM df_stream").createOrReplaceTempView("df_stream_unique")
print("done")

In [9]:
%spark.sql
SELECT * FROM df_stream_unique limit 10;

In [10]:
%spark.sql
-- 3) add mnPlayed(minute) column and split 'endTime' column along with endTime/endDate

SELECT artistName,
       split(endTime, ' ')[0] AS endDate,
       split(endTime, ' ')[1] AS endTime,
       msPlayed, 
       ROUND((msPlayed/60000),1) AS mnPlayed, 
       trackName, 
       UniqueID 
FROM df_stream_unique 
LIMIT 20;


In [11]:
%spark.pyspark
# 4) create final df_stream table
spark.sql("""SELECT artistName,
       split(endTime, ' ')[0] AS endDate,
       split(endTime, ' ')[1] AS endTime,
       msPlayed,
       ROUND((msPlayed/60000),1) AS mnPlayed,
       trackName,
       UniqueID
       FROM df_stream_unique""").createOrReplaceTempView("df_stream_fin")
print('done')



In [12]:
%spark.sql
SELECT *
FROM df_stream_fin
LIMIT 10;

In [13]:
%spark.sql
SELECT artistName,
       trackName,
       endDate,
       endTime,
       mnPlayed,
       msPlayed,
       UniqueID
FROM df_stream_fin LIMIT 10;

In [14]:
%spark.pyspark
# 5) create final df_stream to store as parquet type 
df_stream = spark.sql("""
SELECT artistName,
       trackName,
       endDate,
       endTime,
       mnPlayed,
       msPlayed,
       UniqueID
FROM df_stream_fin
""")

df_stream.show(10)

In [15]:
%spark.pyspark
# 6) store final streaming file as parquet @hdfs
df_stream.write.parquet('/user/mijin/spotify/src/parquet')


In [16]:
%spark.pyspark
from pyspark.sql.types import StructType, ArrayType, StringType, StructField

# 1) show myLibrary.json data type

PATH2="/user/mijin/spotify/src/json/YourLibrary.json"

df2 = spark.read.option("multiline","true").json(PATH2) # !!!!!!!!!!
df2.createOrReplaceTempView("df_library")
df2.printSchema()



In [17]:
%spark.pyspark

# 2) divide with first-level keys 'track'

from pyspark.sql.functions import col, explode

df_tracks = df2.select(
    explode("tracks").alias("track")
).select(
     col("track.album").alias("album"),
     col("track.artist").alias("artist"),
     col("track.track").alias("track"),
     col("track.uri").alias("uri")
)

df_tracks.createOrReplaceTempView("lib_tracks")
df_tracks.printSchema()



In [18]:
%spark.sql
SELECT* FROM lib_tracks;

In [19]:
%spark.sql
SELECT *, CONCAT(artist, ':', track) AS UniqueID 
FROM lib_tracks
LIMIT 10;

In [20]:
%spark.pyspark
# 3) Creating library table included UniqueID, track_uri
spark.sql("SELECT album, artist, track, CONCAT(artist, ':', track) AS UniqueID, split(uri, ':')[2] AS track_uri FROM lib_tracks").createOrReplaceTempView("df_lib_tracks")
print("done")

In [21]:
%spark.sql
SELECT * FROM df_lib_tracks limit 5;

In [22]:
%spark.pyspark
# 4)create final df_track to store as parquet type 
df_track = spark.sql("""
SELECT album, artist, track, CONCAT(artist, ':', track) AS UniqueID, split(uri, ':')[2] AS track_uri FROM lib_tracks
""")

df_track.show(10)


In [23]:
%spark.pyspark
# 5) store df_track @hdfs
df_track.write.parquet('/user/mijin/spotify/src/parquet/lib')


In [24]:
%spark.sql
-- 1) left join two tables (streaming and lib) to find track_uri
SELECT artistName,endDate,endTime,msPlayed,mnPlayed,trackName,s.UniqueID,l.album,l.track_uri
FROM df_stream_fin s
LEFT JOIN df_lib_tracks l ON s.UniqueID = l.UniqueID 

In [25]:
%spark.pyspark

# 2)create df_join to store as parquet type

df_join = spark.sql("""
SELECT artistName,endDate,endTime,msPlayed,mnPlayed,trackName,s.UniqueID,l.album,l.track_uri
FROM df_stream_fin s
LEFT JOIN df_lib_tracks l ON s.UniqueID = l.UniqueID 
""")

df_join.show(10)


In [26]:
%spark.pyspark
# 3) store df_join as parquet type @hdfs
df_join.write.parquet('/user/mijin/spotify/src/parquet/join/')

In [27]:
%spark.pyspark
#1) load genre file derived from spotify api
PATH_G = "/user/mijin/spotify/src/genre"

df_g = spark.read.option("header", True).csv(PATH_G) 
df_g.createOrReplaceTempView("df_genre")
df_g.printSchema()


In [28]:
%spark.pyspark
# 2) create genre table
spark.sql("SELECT track_uri, artist_uri, genres FROM df_genre").createOrReplaceTempView("df_genre")
print("done")

In [29]:
%spark.sql
SELECT * FROM df_genre limit 20;

In [30]:
%spark.pyspark
# prep for visualization 1) load streaming parquet file for visualization
dfs_parquet = spark.read.parquet('/user/mijin/spotify/src/parquet/join/*')
dfs_parquet.cache
dfs_parquet.count()

In [31]:
%spark.pyspark
dfs_parquet.printSchema()

In [32]:
%spark.pyspark
# prep for visualization 2) make table for visaulization using sql
dfs_parquet.createOrReplaceTempView("my_streaming")


In [33]:
%spark.sql
SELECT * FROM my_streaming 
WHERE track_uri is not NULL 
LIMIT 50;

In [34]:
%spark.sql
SELECT COUNT(*) FROM my_streaming
WHERE mnPlayed > 15

In [35]:
%spark.pyspark

# prep for visualization 3) make table where listend over 2mins

spark.sql("""
SELECT * FROM my_streaming
WHERE mnPlayed > 2" AND mnP"").createOrReplaceTempView("my_streaming_over2") ;

In [36]:
%spark.sql
SELECT COUNT(*) FROM my_streaming_over2;

-- original : 13,552
-- actual :  11,160
-- differ : 2,392 (random songs popped out not my style?)

In [37]:
%spark.pyspark

# prep for visualization 4) make table where listend less than 15mins

spark.sql("""
SELECT * FROM my_streaming_over2
WHERE mnPlayed <= 15""").createOrReplaceTempView("my_streaming_only_songs") 
print('done')

In [38]:
%spark.sql
SELECT COUNT(*) FROM my_streaming_only_songs;

-- actual :  11,160
-- less than 15mins : 11,061
-- differ : 99 (maybe podcasts?)

In [39]:
%spark.sql
SELECT * FROM my_streaming_only_songs LIMIT 10 ;

In [40]:
%spark.sql
--by artist
SELECT artistName,
       COUNT(artistName) count_artist,
       ROUND(SUM(mnPlayed),1) mnPlayed_arist,
       ROUND(SUM(msPlayed/3600000),1) hrPlayed_artist
FROM my_streaming_only_songs
GROUP BY artistName
ORDER BY COUNT(artistName) DESC
LIMIT 20;


In [41]:
%spark.sql
-- by track
SELECT trackName,
       artistName,
       count(trackName) AS track_count,
       ROUND(SUM(mnPlayed),1) mnPlayed_track,
       
       ROUND(SUM(msPlayed/3600000),1) hrPlayed_track
FROM
     my_streaming_only_songs
GROUP BY trackName, artistName
ORDER BY track_count DESC
LIMIT 20;


In [42]:
%spark.sql
--morning time top list
SELECT trackName,
       artistName,
       COUNT(*) as track_count,
       ROUND(SUM(mnPlayed),1) mnPlayed_track,
       ROUND(SUM(msPlayed/3600000),1) hrPlayed_track
       
FROM(
    SELECT trackName,
           artistName,
           endTime,
           mnPlayed,
           msPlayed
    FROM my_streaming_only_songs
    WHERE endTime LIKE '05:%' OR 
          endTime LIKE '06:%' OR 
          endTime LIKE '07:%' OR 
          endTime LIKE '08:%' OR 
          endTime LIKE '09:%' OR 
          endTime LIKE '10:%' OR 
          endTime LIKE '11:%')
       
       


In [43]:
%spark.sql


In [44]:
%spark.sql


In [45]:
%spark.sql


In [46]:
%spark.pyspark
# Define the schema for the 'albums' and 'tracks' keys
schema2 = StructType([
    StructField("albums", ArrayType(StructType([
        StructField("album", StringType(), nullable=True),
        StructField("artist", StringType(), nullable=True),
        StructField("uri", StringType(), nullable=True)
    ])), nullable=True),
    StructField("tracks", ArrayType(StructType([
        StructField("album", StringType(), nullable=True),
        StructField("artist", StringType(), nullable=True),
        StructField("track", StringType(), nullable=True),
        StructField("uri", StringType(), nullable=True)
    ])), nullable=True)
])

# Read the JSON file with the specified schema
df = spark.read.schema(schema2).json("path/to/my_library.json")

# Create a temporary view or table with the name 'my_library'
df.createOrReplaceTempView("my_library")

-----------digging from below--------------------

In [48]:
%spark.pyspark
#삽질
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder.getOrCreate()

schema = StructType([
    StructField("endTime", StringType(), nullable=True),
    StructField("artistName", StringType(), nullable=True),
    StructField("trackName", StringType(), nullable=True),
    StructField("msPlayed", LongType(), nullable=True)
])

df = spark.read.schema(schema).json("file:///home/mijin/space/spotify/json/StreamingHistory0.json")
df.show(10)

In [49]:
%spark.pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *

schema = StructType([
     StructField("endTime", LongType(), True) 
    StrucField("artistName", StringType(), True),
    StructField("trackName", StringType(), True),
    StructField("msPlayed", LongType(), True)
    StructFie])
spark = SparkSession.builder.getOrCreate()

df = spark.read.json("file:///home/mijin/space/spotify/json/StreamingHistory0.json",
df.printSchema()


In [50]:
%python
import pandas as pd
import numpy as np
import requests

SPOTIFYHOME = "file:///home/mijin/space/spotify"


# read my 1+ StreamingHistory files (depending on how)
df_stream0 = pd.read_json(f"{SPOTIFYHOME}/json/StreamingHistory0.json")
df_stream1 = pd.read_json(f"{SPOTIFYHOME}/json/StreamingHistory1.json")

# merge streaming dataframes
df_stream = pd.concat([df_stream0,df_stream1])

# create a 'uniqueID' for each song by combining the fields 'artistName' and 'trackName'
df_stream['UniqueID'] = df_stream['artistName']+":"+df_stream['trackName']

df_stream.head()

In [51]:
%spark.pyspark
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Define the schema for your DataFrame
schema = StructType([
    StructField("endTime", LongType(), True),
    StructField("artistName", StringType(), True),
    StructField("trackName", StringType(), True),
    StructField("msPlayed", LongType(), True)
    
df_stream0 = spark.read.schema(schema).json("file:///home/mijin/space/spotify/json/StreamingHistory0.json").cache()

df_stream0.printSchema()
df_stream0.show()


In [52]:
%spark.pyspark

#Define the schema for my JSON data
schema = StructType([
    StructField("endTime", TimestampType(), True),
    StructField("artistName", StringType(), True),
    StructField("trackName", StringType(), True),
    StructField("msPlayed", IntegerType(), True)
])

SPOTIFYHOME = "file:///home/mijin/space/spotify"
df = spark.read.schema(schema).json(f"{SPOTIFYHOME}/json/StreamingHistory0.json")
df.write.parquet(f"{SPOTIFYHOME}/parquet/StreamingHistory0.parquet")  # Save the DataFrame as a Parquet file


In [53]:
%spark.pyspark

SPOTIFYHOME = "file:///home/mijin/space/spotify"
df = spark.read.parquet(f"{SPOTIFYHOME}/parquet/StreamingHistory0.parquet")
df.show(50)



In [54]:
%spark.pyspark
df.show()

In [55]:
%spark.pyspark
