In [0]:
from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window

bronze_album_df = spark.read.table("music.bronze_album")
silver_data = (
    bronze_album_df
    .select("artist")
    .distinct()
    .withColumnRenamed("artist", "name")
)
window_spec = Window.orderBy("name")
silver_artist_df = silver_data.withColumn("id", 
    row_number().over(window_spec).cast("long"))

silver_artist_df = silver_artist_df.select("id", "name")

In [0]:
display(silver_artist_df.limit(5))

In [0]:
bronze_album_df = spark.read.table("music.bronze_album")
display(bronze_album_df.select("release_date").limit(50))

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import coalesce, col, row_number, to_date, when, year

bronze_album_df = spark.read.table("music.bronze_album")

window_spec = Window.partitionBy("artist", "album_name").orderBy("release_date")
distinct_album_df = (
    bronze_album_df
    .withColumn("row_num", row_number().over(window_spec))
    .filter(col("row_num") == 1)
)

silver_album_df = (
    distinct_album_df
    .join(
        silver_artist_df,
        distinct_album_df["artist"] == silver_artist_df["name"],
        "left"
    )
    .select(
        distinct_album_df["id"],
        silver_artist_df["id"].alias("artist_id"),
        distinct_album_df["album_name"].alias("name"),
        distinct_album_df["release_year"],
        distinct_album_df["release_date"],
    )
)

display(silver_album_df.limit(5))

In [0]:
null_artist_id_count = silver_album_df.filter(col("artist_id").isNull()).count()
null_artist_id_count

In [0]:
bronze_song_df = spark.read.table("music.bronze_song")
silver_song_df = bronze_song_df.join(
    silver_album_df,
    bronze_song_df.album_number == silver_album_df.id
).select(
    silver_album_df.artist_id,
    silver_album_df.id.alias("album_id"),
    bronze_song_df.name,
    bronze_song_df.track_number
)


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.orderBy("artist_id", "album_id", "track_number")
silver_song_df = silver_song_df.withColumn("id", row_number().over(window_spec))

silver_song_df = silver_song_df.select("id", "artist_id", "album_id", "name", "track_number")

display(silver_song_df.limit(15))

In [0]:
silver_member_df = spark.read.table("music.silver_album_club_member")

In [0]:
display(silver_member_df)

In [0]:
silver_album_df = spark.read.table("music.silver_album")
display(silver_album_df.limit(10))

In [0]:
%sql
select distinct album_number from music.bronze_member_rating

In [0]:
bronze_member_rating_df = spark.read.table("music.bronze_member_rating")
silver_album_df = spark.read.table("music.silver_album")

joined_df = silver_data = bronze_member_rating_df.join(
        silver_album_df,
        bronze_member_rating_df.album_name == silver_album_df["name"],
        "left"
    )
joined_df.count()

In [0]:
bronze_member_rating_df = spark.read.table("music.bronze_member_rating")
bronze_album_df = spark.read.table("music.bronze_album")

missing_albums_df = bronze_member_rating_df.join(
    bronze_album_df,
    bronze_member_rating_df.album_name == bronze_album_df.album_name,
    "left_anti"
).select("album_name").distinct().orderBy("release_year")

display(missing_albums_df)

In [0]:
%sql
select * from music.bronze_album where artist like '%Pearl Jam%'

In [0]:
from pyspark.sql.functions import col

bronze_member_rating_df = spark.read.table("music.bronze_member_rating")
null_album_name = bronze_member_rating_df.filter(col("album_name").isNull())
display(null_album_name)

In [0]:
%sql
update music.bronze_member_rating
set album_name = 'John Prine'
where album_name = 'John Prine \'71'

In [0]:
    bronze_member_rating_df = spark.read.table("music.bronze_member_rating")
    silver_member_df = spark.read.table("music.silver_album_club_member")
    silver_album_df = spark.read.table("music.silver_album")

    silver_song_df = bronze_member_rating_df.join(
        silver_album_df,
        bronze_member_rating_df.album_number == silver_album_df.id
    ).join(
        silver_member_df,
        bronze_member_rating_df.member == silver_member_df["short_name"]
    ).select(
        silver_album_df.artist_id,
        silver_album_df.id.alias("album_id"),
        silver_member_df.id.alias("member_id"),
        bronze_member_rating_df['week_number'],
        bronze_member_rating_df['meeting_date'],
        bronze_member_rating_df['rating'],
        bronze_member_rating_df['favorite_songs'],
    )
    display(silver_song_df.limit(15))