# User Data Cleaning

In [2]:
# PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the SparkContext.   
import findspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, count, from_json
from pyspark.sql.types import *
from awesome_spotify_packages.aws import SecretManager as secret_manager

In [3]:
conf = SparkConf()
conf.setAll([
    ("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262"),
    ("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"),
    ("fs.s3a.aws.credentials.provider","com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
])

<pyspark.conf.SparkConf at 0x11670b4a0>

In [4]:
# Creating a SparkContext object  
findspark.init()
sc = SparkContext.getOrCreate()
# Creating a SparkSession  
spark = SparkSession.builder \
    .config(conf = conf) \
    .appName("awesome-api-potify") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/21 15:31:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark

In [5]:
bucket_secret = secret_manager('dev/awesome_api_spotify/bucket_name')
bucket_name = bucket_secret.get_secret()
wildcard_path = 'User/top/artists/*.json'
s3_path_with_wildcard = f"s3a://{bucket_name}/{wildcard_path}"

In [6]:
artist = spark.read.option("multiline","true").json(s3_path_with_wildcard)

24/05/21 15:31:30 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

24/05/21 15:31:36 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [13]:
artist = artist.select('id', 'name', 'genres')

+--------------------+--------------+--------------------+
|                  id|          name|              genres|
+--------------------+--------------+--------------------+
|59oA5WbbQvomJz2Bu...|        Jungle|[indie soul, uk c...|
|5NGO30tJxFlKixkPS...|    The Police|[album rock, clas...|
|0OluGbRuQQEcYyttG...| Los Mesoneros|[venezuelan indie...|
|3WrFJ7ztbogyGnTHb...|   The Beatles|[british invasion...|
|6olE6TJLqED3rqDCT...|       Nirvana|[grunge, permanen...|
|36QJpDe2go2KgaRle...|  Led Zeppelin|[album rock, clas...|
|1QOmebWGB6FdFtW7B...|Gustavo Cerati|[argentine rock, ...|
|7An4yvF7hDYDolN4m...|   Soda Stereo|[argentine rock, ...|
|2zwh4WnVBGZcfnllC...|  Viniloversus|[caracas indie, r...|
|1Mxqyy3pSjf8kZZL4...| Frank Sinatra|[adult standards,...|
|587PA35pRGL1JwQr6...|  NEIL FRANCES|        [indie soul]|
|1IQ2e1buppatiN1bx...|        Slayer|[alternative meta...|
|3Te1e4bESVFb1KcrM...|Gerardo Millán|       [lo-fi study]|
|0Ty63ceoRnnJKVEYP...|         Sting|[permanent wave, ..

                                                                                

In [8]:
df_exploded = genres.withColumn("genre", explode(col("genres"))).drop("genres")

In [9]:
top_genres = df_exploded.groupBy('genre').agg(count('genre').alias('genre_count')).orderBy('genre_count', ascending = False)

In [10]:
#top_genres.show()
top_genres.show()

                                                                                

+-----------------+-----------+
|            genre|genre_count|
+-----------------+-----------+
|             rock|         37|
|   permanent wave|         14|
|      modern rock|         14|
|        hard rock|         11|
|              pop|         10|
|     classic rock|         10|
|alternative metal|          9|
|       indie soul|          9|
|              edm|          9|
|       album rock|          8|
| alternative rock|          8|
|      lo-fi cover|          7|
|      post-grunge|          7|
|        pop dance|          7|
|            house|          6|
|     indietronica|          6|
|        soft rock|          6|
|      lo-fi study|          5|
|       vocal jazz|          5|
|         nu metal|          5|
+-----------------+-----------+
only showing top 20 rows



In [63]:
spark.stop()

ConnectionRefusedError: [Errno 61] Connection refused