# BIG DATA PROCESSIN PROJECT

In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import desc, col, isnan, when, count, mean ,stddev, expr

from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType, IntegerType

## 1. Create a SparkSession

In [2]:
spark = SparkSession.builder.appName("Spotify").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/14 10:16:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark

## 2. Data loading and cleaning
### 2.1 Read CSV File into DataFrame

In [4]:
# Read CSV file into DataFrame with header
csv_file_path =("./data/spotify-data.csv")
df = spark.read.csv(csv_file_path,header=True)

In [5]:
# Display schema of DataFrame
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- artists: string (nullable = true)
 |-- duration_ms: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- year: string (nullable = true)
 |-- acousticness: string (nullable = true)
 |-- danceability: string (nullable = true)
 |-- energy: string (nullable = true)
 |-- instrumentalness: string (nullable = true)
 |-- liveness: string (nullable = true)
 |-- loudness: string (nullable = true)
 |-- speechiness: string (nullable = true)
 |-- tempo: string (nullable = true)
 |-- valence: string (nullable = true)
 |-- mode: string (nullable = true)
 |-- key: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- explicit: string (nullable = true)



24/11/14 10:16:26 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


### 2.2. Read CSV File into DataFrame with my_schema

- id: str, identifier of the track.
- name: str, name of the track.
- artists: str, artists of the track.
- duration_ms: float, duration of the track in milliseconds.
- release_date: date, release date of the track.
- year: int, release year of the track.
- acousticness: float, measure of acousticness of the track.
- danceability: float, measure of danceability of the track.
- energy: float, measure of energy of the track.
- instrumentalness: float, measure of instrumental elements in the track.
- liveness: float, measure of liveness of the track.
- loudness: float, loudness of the track.
- speechiness: float, measure of speechiness in the track.
- tempo: float, tempo of the track.
- valence: float, measure of valence (positivity) of the track.
- mode: int, mode of the track (major or minor).
- key: int, key of the track.
- popularity: int, popularity score of the track.
- explicit: int, indication of explicit content presence (explicit or implicit).

In [186]:
df_schema = StructType([
    StructField("id", StringType(), True),
    StructField("name", StringType(),True),
    StructField("artists", StringType(),True),
    StructField("duration_ms", DoubleType(), True),
    StructField("release_date", StringType(),True),
    StructField("year",IntegerType(),True),
    StructField("acousticness",DoubleType(),True),
    StructField("danceability",DoubleType(),True),
    StructField("energy",DoubleType(),True),
    StructField("instrumentalness",DoubleType(),True),
    StructField("liveness",DoubleType(),True),
    StructField("loudness",DoubleType(),True),
    StructField("speechiness",DoubleType(),True),
    StructField("tempo",DoubleType(),True),
    StructField("valence",DoubleType(),True),
    StructField("mode",IntegerType(),True),
    StructField("key",IntegerType(),True),
    StructField("popularity",IntegerType(),True),
    StructField("explicit",IntegerType(),True)
])

In [187]:
# Read CSV file into DataFrame with header
csv_file_path =("./data/spotify-data.csv")
df = spark.read.csv(csv_file_path,header=True, schema=df_schema)

In [188]:
# Display schema of DataFrame
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- artists: string (nullable = true)
 |-- duration_ms: double (nullable = true)
 |-- release_date: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- danceability: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- loudness: double (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- mode: integer (nullable = true)
 |-- key: integer (nullable = true)
 |-- popularity: integer (nullable = true)
 |-- explicit: integer (nullable = true)



In [189]:
# Display content of df
df.show()

+--------------------+--------------------+--------------------+-----------+------------+----+------------+------------+-------+----------------+--------+--------+-----------+-------+-------+----+---+----------+--------+
|                  id|                name|             artists|duration_ms|release_date|year|acousticness|danceability| energy|instrumentalness|liveness|loudness|speechiness|  tempo|valence|mode|key|popularity|explicit|
+--------------------+--------------------+--------------------+-----------+------------+----+------------+------------+-------+----------------+--------+--------+-----------+-------+-------+----+---+----------+--------+
|6KbQ3uYMLKb5jDxLF...|Singende Bataillo...| ['Carl Woitschach']|   158648.0|        1928|1928|       0.995|       0.708|  0.195|           0.563|   0.151| -12.428|     0.0506|118.469|  0.779|   1| 10|         0|       0|
|6KuQTIu1KoTTkLXKr...|Fantasiestücke, O...|['Robert Schumann...|   282133.0|        1928|1928|       0.994|       0.

In [190]:
df.where(col("id") == "6TFuAErGpJ9FpxQQ1HC8nM").show()

+--------------------+--------------------+--------------------+-----------+------------+----+------------+------------+------+----------------+--------+--------+-----------+-------+-------+----+---+----------+--------+
|                  id|                name|             artists|duration_ms|release_date|year|acousticness|danceability|energy|instrumentalness|liveness|loudness|speechiness|  tempo|valence|mode|key|popularity|explicit|
+--------------------+--------------------+--------------------+-----------+------------+----+------------+------------+------+----------------+--------+--------+-----------+-------+-------+----+---+----------+--------+
|6TFuAErGpJ9FpxQQ1...|Invocación al Tan...|['Francisco Canar...|   167107.0|     9/16/28|1928|       0.994|       0.787| 0.156|           0.659|    0.11| -14.056|      0.157|117.167|  0.849|   0|  4|         0|       0|
+--------------------+--------------------+--------------------+-----------+------------+----+------------+------------+

### 3 Calculating basic statistics

In [None]:
 # 3.1. Transformation of the Columna date

In [56]:
#from pyspark.sql.functions import to_date

#df = df.withColumn("release_date", to_date(col("release_date"),"dd/MM/yy"))

In [61]:
#df.where(col("id") == "6TFuAErGpJ9FpxQQ1HC8nM").show()

In [191]:
null_counts = df.select([count(when(col(c).isNull() | isnan(c), c)).alias(c) for c in df.columns])
null_counts.show()


+---+----+-------+-----------+------------+----+------------+------------+------+----------------+--------+--------+-----------+-----+-------+----+---+----------+--------+
| id|name|artists|duration_ms|release_date|year|acousticness|danceability|energy|instrumentalness|liveness|loudness|speechiness|tempo|valence|mode|key|popularity|explicit|
+---+----+-------+-----------+------------+----+------------+------------+------+----------------+--------+--------+-----------+-----+-------+----+---+----------+--------+
|  0|   0|      0|       1447|           0| 692|         285|         138|    70|              44|      23|       8|          8|    4|      2|1438|614|       326|     187|
+---+----+-------+-----------+------------+----+------------+------------+------+----------------+--------+--------+-----------+-----+-------+----+---+----------+--------+



#### 3.1 Row Counting and Unique Values

In [192]:
print("Rows:", df.count())
print("Col:", len(df.columns))


Rows: 169909
Col: 19


In [193]:
for col_name in df.columns:
    unique_count = df.select(col_name).distinct().count()
    print(f"Column {col_name} has {unique_count} unique")

Column id has 169909 unique
Column name has 132895 unique
Column artists has 33567 unique
Column duration_ms has 49937 unique
Column release_date has 12053 unique
Column year has 375 unique
Column acousticness has 4935 unique
Column danceability has 1413 unique
Column energy has 2412 unique
Column instrumentalness has 5443 unique
Column liveness has 2081 unique
Column loudness has 25992 unique
Column speechiness has 2506 unique
Column tempo has 85123 unique
Column valence has 2696 unique
Column mode has 5 unique
Column key has 13 unique
Column popularity has 102 unique
Column explicit has 71 unique


#### 3.2 Descriptive Statistics


In [194]:
df.describe().show()



+-------+--------------------+----------------------------+--------------------+------------------+--------------------+------------------+-----------------+------------------+------------------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------+-----------------+-----------------+-------------------+
|summary|                  id|                        name|             artists|       duration_ms|        release_date|              year|     acousticness|      danceability|            energy| instrumentalness|          liveness|          loudness|      speechiness|            tempo|           valence|              mode|              key|       popularity|           explicit|
+-------+--------------------+----------------------------+--------------------+------------------+--------------------+------------------+-----------------+------------------+------------------+-----------------+------------------+----

                                                                                

### 4. Data cleaning

#### 4.2 Clean and Standardize release_date
##### 4.2.1 Extract day and month of "date_format"

In [195]:
from pyspark.sql.functions import regexp_extract, col, to_date, when
from pyspark.sql import functions as F

In [138]:
#df = df.withColumn("year_only", regexp_extract(col("release_date"),"^(\\d{4})$",1))
#df = df.withColumn("date_format", regexp_extract(col("release_date"),"^(\\d{1,2})/(\\d{1,2})/(\\d{2})$",0))

In [196]:
df = df.withColumn("month", F.split("release_date","/")[0].cast("int"))
df = df.withColumn("day", F.split("release_date","/")[1].cast("int"))

In [197]:
df.select("release_date","year","day","month").show()

+------------+----+----+-----+
|release_date|year| day|month|
+------------+----+----+-----+
|        1928|1928|NULL| 1928|
|        1928|1928|NULL| 1928|
|        1928|1928|NULL| 1928|
|     9/25/28|1928|  25|    9|
|        1928|1928|NULL| 1928|
|        1928|1928|NULL| 1928|
|        1928|1928|NULL| 1928|
|        1928|1928|NULL| 1928|
|     10/3/28|1928|   3|   10|
|        1928|1928|NULL| 1928|
|        1928|1928|NULL| 1928|
|        1928|1928|NULL| 1928|
|        1928|1928|NULL| 1928|
|        1928|1928|NULL| 1928|
|      1/1/28|1928|   1|    1|
|        1928|1928|NULL| 1928|
|        1928|1928|NULL| 1928|
|     9/16/28|1928|  16|    9|
|        1928|1928|NULL| 1928|
|     9/17/28|1928|  17|    9|
+------------+----+----+-----+
only showing top 20 rows



##### 4.2.2 Combine year,day, month

In [198]:
df = df.withColumn("date_combined",
                   F.expr("make_date(year, month,day)"))

In [199]:
df.select("release_date","year","day","month","date_combined").show()

+------------+----+----+-----+-------------+
|release_date|year| day|month|date_combined|
+------------+----+----+-----+-------------+
|        1928|1928|NULL| 1928|         NULL|
|        1928|1928|NULL| 1928|         NULL|
|        1928|1928|NULL| 1928|         NULL|
|     9/25/28|1928|  25|    9|   1928-09-25|
|        1928|1928|NULL| 1928|         NULL|
|        1928|1928|NULL| 1928|         NULL|
|        1928|1928|NULL| 1928|         NULL|
|        1928|1928|NULL| 1928|         NULL|
|     10/3/28|1928|   3|   10|   1928-10-03|
|        1928|1928|NULL| 1928|         NULL|
|        1928|1928|NULL| 1928|         NULL|
|        1928|1928|NULL| 1928|         NULL|
|        1928|1928|NULL| 1928|         NULL|
|        1928|1928|NULL| 1928|         NULL|
|      1/1/28|1928|   1|    1|   1928-01-01|
|        1928|1928|NULL| 1928|         NULL|
|        1928|1928|NULL| 1928|         NULL|
|     9/16/28|1928|  16|    9|   1928-09-16|
|        1928|1928|NULL| 1928|         NULL|
|     9/17

##### 4.2.3 Convert `date_combined` to yyyy-MM-dd format 

In [200]:
df = df.withColumn("formatted_date",F.date_format("date_combined","yyyy-MM-dd"))

In [201]:
df.select("release_date","year","day","month","date_combined","formatted_date").show()

+------------+----+----+-----+-------------+--------------+
|release_date|year| day|month|date_combined|formatted_date|
+------------+----+----+-----+-------------+--------------+
|        1928|1928|NULL| 1928|         NULL|          NULL|
|        1928|1928|NULL| 1928|         NULL|          NULL|
|        1928|1928|NULL| 1928|         NULL|          NULL|
|     9/25/28|1928|  25|    9|   1928-09-25|    1928-09-25|
|        1928|1928|NULL| 1928|         NULL|          NULL|
|        1928|1928|NULL| 1928|         NULL|          NULL|
|        1928|1928|NULL| 1928|         NULL|          NULL|
|        1928|1928|NULL| 1928|         NULL|          NULL|
|     10/3/28|1928|   3|   10|   1928-10-03|    1928-10-03|
|        1928|1928|NULL| 1928|         NULL|          NULL|
|        1928|1928|NULL| 1928|         NULL|          NULL|
|        1928|1928|NULL| 1928|         NULL|          NULL|
|        1928|1928|NULL| 1928|         NULL|          NULL|
|        1928|1928|NULL| 1928|         N

In [202]:
df = df.drop("month", "day", "date_combined")

In [203]:
df = df.withColumn("formatted_date",
                   F.when(F.col("formatted_date").isNull(),
                          F.concat(F.col("year").cast("string"),F.lit("-01-01")))
                   .otherwise(F.col("formatted_date")))

In [204]:
df.select("release_date", "year", "formatted_date").show()

+------------+----+--------------+
|release_date|year|formatted_date|
+------------+----+--------------+
|        1928|1928|    1928-01-01|
|        1928|1928|    1928-01-01|
|        1928|1928|    1928-01-01|
|     9/25/28|1928|    1928-09-25|
|        1928|1928|    1928-01-01|
|        1928|1928|    1928-01-01|
|        1928|1928|    1928-01-01|
|        1928|1928|    1928-01-01|
|     10/3/28|1928|    1928-10-03|
|        1928|1928|    1928-01-01|
|        1928|1928|    1928-01-01|
|        1928|1928|    1928-01-01|
|        1928|1928|    1928-01-01|
|        1928|1928|    1928-01-01|
|      1/1/28|1928|    1928-01-01|
|        1928|1928|    1928-01-01|
|        1928|1928|    1928-01-01|
|     9/16/28|1928|    1928-09-16|
|        1928|1928|    1928-01-01|
|     9/17/28|1928|    1928-09-17|
+------------+----+--------------+
only showing top 20 rows



In [206]:
df = df.drop("release_date","year")

In [207]:
null_counts = df.select([count(when(col(c).isNull() | isnan(c), c)).alias(c) for c in df.columns])
null_counts.show()

+---+----+-------+-----------+------------+------------+------+----------------+--------+--------+-----------+-----+-------+----+---+----------+--------+--------------+
| id|name|artists|duration_ms|acousticness|danceability|energy|instrumentalness|liveness|loudness|speechiness|tempo|valence|mode|key|popularity|explicit|formatted_date|
+---+----+-------+-----------+------------+------------+------+----------------+--------+--------+-----------+-----+-------+----+---+----------+--------+--------------+
|  0|   0|      0|       1447|         285|         138|    70|              44|      23|       8|          8|    4|      2|1438|614|       326|     187|           692|
+---+----+-------+-----------+------------+------------+------+----------------+--------+--------+-----------+-----+-------+----+---+----------+--------+--------------+

