# General Imports

In [None]:
# Setting up the environment
import pandas as pd
import boto3
import os
import time

# AWS Credentials and Settings
access_key = 'ACCESS_KEY'
secret_key = 'SECRET_ACCESS_KEY'

os.environ['AWS_ACCESS_KEY_ID'] = access_key
os.environ['AWS_SECRET_ACCESS_KEY'] = secret_key
encoded_secret_key = secret_key.replace("/", "%2F").replace("+", "%2B")

aws_region = 'us-east-1'

s3 = boto3.client(
    service_name='s3',
    region_name=aws_region,
    aws_access_key_id=access_key,
    aws_secret_access_key=secret_key
)



#Spark Stuff


In [None]:
# Setting up Spark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import udf, explode, col, collect_list, regexp_replace, split, expr, length, concat_ws
from pyspark.sql.types import ArrayType, StringType, IntegerType, FloatType, DoubleType
from pyspark import sql
import pyspark.pandas as ps

# Set up Spark
spark = SparkSession.builder \
    .appName("PicklesPlus") \
    .config("spark.hadoop.fs.s3a.access.key", access_key) \
    .config("spark.hadoop.fs.s3a.secret.key", secret_key) \
    .config("spark.hadoop.fs.s3a.endpoint","s3." + aws_region + ".amazonaws.com") \
    .config("spark.executor.memory", "15g") \
    .config("spark.executor.cores", "2") \
    .config("spark.default.parallelism", "4") \
    .config("spark.sql.shuffle.partitions", "4") \
    .getOrCreate()

#CSV Cleaning

In [None]:
# Referenced https://stackoverflow.com/questions/70899029/how-to-get-all-rows-with-null-value-in-any-column-in-pyspark

def clean_spark_df(df):
    # Filters for null values
    null_filter = F.exists(F.array(*df.columns), lambda x: x.isNull())
    df_null = df.filter(null_filter)
    # Removes those entries
    df_fin = df.subtract(df_null)
    return(df_fin)

csv_paths = ['s3a://kagglespotify6k/landing/artists.csv', 's3a://kagglespotify6k/landing/final_playlists.csv', 's3a://kagglespotify6k/landing/final_tracks.csv','s3a://kagglespotify6k/landing/main_dataset.csv']

for path in csv_paths:
    file_load = spark.read.csv(path, header = True, inferSchema = True)
    df_clean = clean_spark_df(file_load)
    file_name = path.split('/')[-1][:-4] + '.parquet'
    df_clean.write.mode('overwrite').parquet(f's3a://kagglespotify6k/raw/{file_name}')
    print(file_name)

artists.parquet
final_playlists.parquet
final_tracks.parquet
main_dataset.parquet


In [None]:
# The genre column in main_dataset is a string that looks like a nested list, will convert it to a normal list of strings
main_df = spark.read.parquet('s3a://kagglespotify6k/raw/main_dataset.parquet/')

# Remove brackets
main_df = main_df.withColumn('artists_genres', regexp_replace('artists_genres', r"[\[\]]", ""))

# Remove single quotes then split into individual genres
main_df = main_df.withColumn('artists_genres', split(regexp_replace('artists_genres', "'", ""), ",\s*"))

# The track_uri column contains an unnecessary sub-string in the front "spotify:track:", will remove it
main_df = main_df.withColumn('track_uri', regexp_replace('track_uri', "spotify:track:", ""))

# Overwrite to save
main_df.write.mode('overwrite').parquet('s3a://kagglespotify6k/raw/main_dataset.parquet/')

#txt Cleaning

In [None]:
# Earlier EDA showed that the txt files do not have null values, but they do have some duplicates
txt_paths = ['s3a://kagglespotify6k/landing/im_getting_these_vibes_uknow.txt', 's3a://kagglespotify6k/landing/music_genres.txt']

for txt_path in txt_paths:
    text_load = spark.read.csv(txt_path, header = False, inferSchema = True)
    text_load = text_load.dropDuplicates()
    file_name = txt_path.split('/')[-1][:-4] + '.parquet'
    text_load.write.parquet(f's3a://kagglespotify6k/raw/{file_name}')
    print(file_name)


im_getting_these_vibes_uknow.parquet
music_genres.parquet


In [None]:
# After some EDA, I noticed that the genres txt file is missing a significant amount of genres that are present in the main_dataset.csv
# Will use the main_dataset to add the missing genres
fix_genre_df =  spark.read.parquet('s3a://kagglespotify6k/raw/music_genres.parquet/')
fix_genre_main = spark.read.parquet('s3a://kagglespotify6k/raw/main_dataset.parquet/')

# Need to explode main_dataset's artists_genres column to get all the individual genres
exploded_df = fix_genre_main.select(explode('artists_genres').alias('genre'))
genres_from_main_df = exploded_df.distinct()

# Get the genres from the raw parquet and rename the column
pre_fix_genres_df = fix_genre_df.select('_c0').distinct()
pre_fix_genres_df = pre_fix_genres_df.withColumnRenamed('_c0','genre')

# Combine 2 dataframes to get new genres dataframe
new_genres_df = pre_fix_genres_df.union(genres_from_main_df)

# Write new_genres_df as a parquet to raw
new_genres_df.write.mode('overwrite').parquet('s3a://kagglespotify6k/raw/fixed_genres_v1.parquet/')