In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").getOrCreate()

22/06/24 09:13:19 WARN Utils: Your hostname, DESKTOP-EJLBN3A resolves to a loopback address: 127.0.1.1; using 172.20.24.149 instead (on interface eth0)
22/06/24 09:13:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/06/24 09:13:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [201]:
### DATA PROFILING

# Reads the csv in the data directory into a spark dataframe
spark_df = (spark.read.format("csv").options(header="true").load("./data/spotify_artists.csv"))


# .describe(), with no arguments prints the dataframe column datatypes 
spark_df.describe()
spark_df.printSchema()
# select allows you to pick specific columns of the dataframe, and perform operation on if you choose.
spark_df.select(spark_df.name, spark_df.genres).show(10)

[Stage 454:>                                                        (0 + 1) / 1]

root
 |-- row: string (nullable = true)
 |-- artist_popularity: string (nullable = true)
 |-- followers: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- track_id: string (nullable = true)
 |-- track_name_prev: string (nullable = true)
 |-- type: string (nullable = true)

+--------------------+--------------------+
|                name|              genres|
+--------------------+--------------------+
|       Juliano Cezar|['sertanejo', 'se...|
|      The Grenadines|                  []|
|             Gangway| ['danish pop rock']|
|               FADES|['uk alternative ...|
| Jean-Pierre Guignon|  ['french baroque']|
|              Filhos|                  []|
|                Eloq|                  []|
|              Fravær|                  []|
|       Camille Pépin|                  []|
|Pepe Willberg & T...|['classic finnish...|
+--------------------+--------------------+
only showing top 10 ro

                                                                                

In [210]:
### DATA CLEANING

from pyspark.sql.functions import regexp_replace, col, udf
from pyspark.sql.types import IntegerType

spark_df = (spark.read.format("csv").options(header="true").load("./data/spotify_artists.csv"))

# Replace the genres column's empty list values with ['elevator music']
# spark_df.filter("genres = '[]' ").show(10)
spark_df = spark_df.withColumn("genres", regexp_replace("genres", r"\[]", r"['elevator music']"))
spark_df.show(10)

# Change these column data types from string to integer so that arithmetic can be performed on them further down
spark_df = spark_df.withColumn("artist_popularity", spark_df["artist_popularity"].cast(IntegerType()))
spark_df = spark_df.withColumn("followers", spark_df["followers"].cast(IntegerType()))
print(spark_df.dtypes)

# Two different ways to use sort (though the sort order changes..) 
# .sort() without also passing col() left me unable to use the .withColumn() method afterwards, so with col() was preferred
# spark_df = spark_df.sort("followers", descending=True)
spark_df = spark_df.sort(col("followers").desc())
spark_df.show(3)

# Here I define a function that I can use to apply arithmetic to a column value
def div_by_100(num):
    """
    Divide a column with type int by 100, obtaining a percentage
    """
    num = num/100
    return num

# Assign a user defined function to work across an entire column
udf_div_by_100 = udf(lambda x:div_by_100(x))

# Use of said function
spark_df = spark_df.withColumn("artist_popularity", udf_div_by_100("artist_popularity"))
spark_df = spark_df.withColumnRenamed("artist_popularity", "popularity_percent")
spark_df.show(3)

+---+-----------------+---------+--------------------+--------------------+--------------------+--------------------+---------------+------+
|row|artist_popularity|followers|              genres|                  id|                name|            track_id|track_name_prev|  type|
+---+-----------------+---------+--------------------+--------------------+--------------------+--------------------+---------------+------+
|  0|               44|    23230|['sertanejo', 'se...|4mGnpjhqgx4RUdsIJ...|       Juliano Cezar|0wmDmAILuW9e2aRtt...|        track_9|artist|
|  1|               22|      313|  ['elevator music']|1dLnVku4VQUOLswwD...|      The Grenadines|4wqwj0gA8qPZKLl5W...|       track_30|artist|
|  2|               26|     1596| ['danish pop rock']|6YVY310fjfUzKi8hi...|             Gangway|1bFqWDbvHmZe2f4Nf...|       track_38|artist|
|  3|               31|      149|['uk alternative ...|2VElyouiCfoYPDJlu...|               FADES|3MFSUBAidPzRBbIS7...|       track_34|artist|
|  4|        

In [220]:
### EXTRACT INFORMATION

# Create a SQL table of the current dataframe for querying
spark_df.createOrReplaceTempView("spark_tbl")
spark.sql("SELECT * FROM spark_tbl WHERE name = 'Queen'").show(5)
spark.sql("SELECT COUNT(*) FROM spark_tbl GROUP BY POPULARITY_PERCENT").show(5)

+-----+------------------+---------+--------------------+--------------------+-----+--------------------+---------------+------+
|  row|popularity_percent|followers|              genres|                  id| name|            track_id|track_name_prev|  type|
+-----+------------------+---------+--------------------+--------------------+-----+--------------------+---------------+------+
|39126|              0.94| 14130233|['glam rock', 'ro...|1dfeR4HaWDbWqFHLk...|Queen|5oidljiMjeJTWUGZ4...|       track_12|artist|
+-----+------------------+---------+--------------------+--------------------+-----+--------------------+---------------+------+

+--------+
|count(1)|
+--------+
|     625|
|       1|
|     441|
|      96|
|     489|
+--------+
only showing top 5 rows



In [226]:
### SAVE AS PARQUET

# Simply writing the transformed dataframe to a parquet file and reading it back
# spark_df.write.parquet("./data/spotify_artists.parquet")
spark_parq = spark.read.parquet("./data/spotify_artists.parquet/")
spark_parq.show(10)

+---+------------------+---------+------------------+--------------------+--------------------+--------------------+---------------+------+
|row|popularity_percent|followers|            genres|                  id|                name|            track_id|track_name_prev|  type|
+---+------------------+---------+------------------+--------------------+--------------------+--------------------+---------------+------+
| 25|              0.21|        0|['elevator music']|6CJCoqivxcdr8zlVe...|        Reiko Tsuiki|7Bdop7N4EKLMk4riE...|       track_92|artist|
| 53|              0.02|        0|['elevator music']|7uLlA3ipeSM7lUSmr...|                BiGz|0ebArfrdaqfhwOmBP...|      track_110|artist|
|107|               0.2|        0|['elevator music']|6WxTDr3eh88JEx91L...|      Roldán Bernabé|2r3q57FhxdsCyYr0k...|       track_28|artist|
|115|              0.22|        0|['elevator music']|1BEyKcixGeLyvFwGA...|Carl Philipp Emanuel|3G3qLFFcPo4khWwE1...|       track_64|artist|
|127|              0