In [None]:
import happybase
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from functools import reduce
from pyspark.sql.types import DateType, StringType,ArrayType
from pyspark.sql.functions import rank,col, when, date_format, explode, split,regexp_replace, udf, datediff, avg, count,countDistinct,first, expr,lit

import time

In [None]:
VM_adress = 'localhost'
connection = happybase.Connection(VM_adress, timeout=999999)
dynamic_table = connection.table('daily_data')
static_table = connection.table('spotify_songs')

In [None]:
# Inicjalizacja sesji Spark
spark = SparkSession.builder.appName("SpotifyAnalysis").getOrCreate()

In [None]:
def create_spark_df(tab, spark):
    data_list = []
    
    for key, data in tab.scan(None):
        key_decoded = key.decode('utf-8')
        row_dict = {'id': key_decoded}
    
        for column, value in data.items():
            column_decoded = column.decode('utf-8')
            value_decoded = value.decode('utf-8')
            row_dict[column_decoded] = value_decoded
    
        data_list.append(row_dict)
        
    spark_df = spark.createDataFrame(data_list)
    return spark_df

In [None]:
daily_data = create_spark_df(dynamic_table, spark)
spotify_songs = create_spark_df(static_table, spark)

dynamic_df = reduce(lambda df, col_name: df.withColumnRenamed(col_name, col_name.replace("data:", "")), daily_data.columns, daily_data)
static_df = reduce(lambda df, col_name: df.withColumnRenamed(col_name, col_name.replace("data:", "")), spotify_songs.columns, spotify_songs)


In [None]:
# Funkcje do przetworzenia ramek i agregacji

def remove_conflicting_columns(df, drop_columns):
    return df.drop(*drop_columns)

def mark_missing_tracks(df,faulty_df):
    df_ids = df.select("track_id")
    faulty_df_ids = faulty_df.select("track_id")
    missing_tracks = df_ids.subtract(faulty_df_ids)
    missing_tracks_list = missing_tracks.rdd.map(lambda x: x[0]).collect()
    return df.withColumn("missing", col("track_id").isin(missing_tracks_list))

def merge_dataframes(dynamic_df, static_df):
    merged_df = dynamic_df.join(static_df, "track_id", "left_outer")
    return merged_df

def assure_date_type(df, date_columns):
    for column in date_columns:
        df = df.withColumn(column, col(column).cast(DateType()))
    return df

def assign_unique_track_id(df):
    windowSpec = Window().partitionBy("track_id", "artist_id", "track_album_name").orderBy("track_album_release_date")

    updated_df = df.withColumn("track_id", first("track_id").over(windowSpec)) \
        .withColumn("track_album_release_date", first("track_album_release_date").over(windowSpec)) \
        .withColumn("missing", first("missing").over(windowSpec))
    return updated_df

def add_season_column(df):
    return df.withColumn(
        "season",
        when(
            (date_format(col("date_added"), "MM-dd").between("12-22", "12-31")) |
            (date_format(col("date_added"), "MM-dd").between("01-01", "03-20")),
            "Winter"
        ).when(
            date_format(col("date_added"), "MM-dd").between("03-21", "06-21"),
            "Spring"
        ).when(
            date_format(col("date_added"), "MM-dd").between("06-22", "09-22"),
            "Summer"
        ).when(
            date_format(col("date_added"), "MM-dd").between("09-23", "12-21"),
            "Autumn"
        ).otherwise("None")
    )

def add_seasonal_event_column(df):
    return df.withColumn("seasonal_event", 
        when(
            date_format(col("date_added"), "MM-dd").between("12-02", "12-28"),
            "Christmas"
        ).otherwise(
            when(
                date_format(col("date_added"), "MM-dd").between("01-01", "01-02"),
                "New Year"
            ).otherwise(
                when(
                    (col("date_added").between("2024-01-07", "2024-03-06")),
                    "Carnival"
                ).otherwise("Normal")
            )))

def assign_main_genre(genre):
    list =[]
    
    if "pop" in genre:
        list.append( "pop")
    if "rock" in genre:
        list.append(  "rock")
    if "hip hop" in genre:
        list.append(  "hip hop")
    if "jazz" in genre:
        list.append("jazz")
    if any(keyword in genre for keyword in ["electronic", "edm", "trap"]):
        list.append("electronic")
    if any(keyword in genre for keyword in ["'rap"," rap"]):
        list.append("rap")
    if "country" in genre:
        list.append("country")
    if any(keyword in genre for keyword in ["r&b", "soul"]):
        list.append(  "r&b/soul")
    if any(keyword in genre for keyword in ["reggaeton", "latino"]):
        list.append(  "reggaeton/latino")
    if any(keyword in genre for keyword in ["classical", "orchestral"]):
        list.append(  "classical/orchestral")
    if "unknown" in genre:
        list.append(  "unknown")
    if any(keyword in genre for keyword in ["alternative", "alt", "indie"]):
        list.append(  "alternative/indie")
    if len(list)>0:
        return list
    else:
        return ["other"]

def transform_artist_genres(df):
    # Rozpakuj listy w poszczególne wiersze
    df = df.withColumn("artist_subgenres", regexp_replace(col("artist_genres"), "[\[\]']", ""))\
            .withColumn("artist_main_genres", assign_main_genre_udf("artist_subgenres"))
    
    df = remove_conflicting_columns(df, ["artist_genres"])
    df = df.dropDuplicates(subset=["track_id", "artist_main_genres","date_added"])
    return df

def add_track_age(df):    
    df = df.withColumn("track_age", datediff(col("date_added"), col("track_album_release_date"))) \
            .withColumn("released_within_a_month", when(col("track_age") <= 31, 1).otherwise(0)) \
            .withColumn("released_within_a_year", when(col("track_age") <= 365, 1).otherwise(0)) \
            .withColumn("duration_min", expr("duration_ms / 60000")) \
            .withColumn("day_of_week", date_format(col("date_added"), "EEEE"))\
            .withColumn("month", date_format(col("date_added"), "MM"))

    return df

def compare_released_within_a_month(df,column): 
    num_tracks = 50
    max_rank_sum = sum(1/i for i in range(1, 50+1))
    const = num_tracks*max_rank_sum

    # Definiowanie kryteriów popularności gatunku
    popularity_criteria = expr("(SUM(1/rank) * COUNT(*)) / (POW(total_days, 2) * {})".format(const))
    
     # Porównanie rankingu utworów z ostatnio wydanej płyty z innymi utworami
    comparison_df = df.groupBy(column,"released_within_a_month","total_days").agg(
         popularity_criteria.alias("group_popularity"),
        (count("*") / (num_tracks * col("total_days")) * 100).alias("average_percent_of_top_50")).orderBy(column).select(column,"released_within_a_month","group_popularity","average_percent_of_top_50")
    
    return comparison_df

def compare_released_within_a_year(df,column):   
    num_tracks = 50
    max_rank_sum = sum(1/i for i in range(1, 50+1))
    const = num_tracks*max_rank_sum
    
    # Definiowanie kryteriów popularności gatunku
    popularity_criteria = expr("(SUM(1/rank) * COUNT(*)) / (POW(total_days, 2) * {})".format(const))

     # Porównanie rankingu utworów z ostatnio wydanej płyty z innymi utworami
    comparison_df = df.groupBy(column,"released_within_a_year","total_days").agg(
         popularity_criteria.alias("group_popularity"),
        (count("*") / (num_tracks * col("total_days")) * 100).alias("average_percent_of_top_50")).orderBy(column).select(column,"released_within_a_year","group_popularity","average_percent_of_top_50")
    return comparison_df

def get_top_tracks(df,column):
  
    num_tracks = 50
    
    # Definiowanie kryteriów popularności gatunku
    popularity_criteria = expr("(SUM(1/rank) * COUNT(*)*100) / (total_days*{})".format(num_tracks))
    
    other_columns = ['track_name','artist','track_album_name']
    comparison_df = df.groupBy(column,"artist_main_genres",*other_columns,"track_id","total_days").agg(
        count("track_id").alias("days_on_chart"),
        popularity_criteria.alias("track_popularity"),
        avg("track_age").alias("average_track_age"),
        )
    
    comparison_df = comparison_df.select(column,*other_columns,"track_popularity","artist_main_genres","days_on_chart","average_track_age").orderBy(column,col("track_popularity").desc())
    # Utwórz okno partycjonujące po kolumnie 'grupa' i sortujące po kolumnie 'wartość' malejąco
    window_spec = Window().partitionBy(column).orderBy(comparison_df["track_popularity"].desc())
    # Dodaj kolumnę rankingu dla każdej grupy
    comparison_df = comparison_df.withColumn("rank", rank().over(window_spec))

    # Wybierz top 5 z każdej grupy
    comparison_df = comparison_df.filter("rank <= 5").drop("rank")
    return comparison_df

def get_top_artists(df,column):
    other_columns = ['artist']
  
    num_tracks = 50
    max_rank_sum = sum(1/i for i in range(1, 50+1))
    const = num_tracks*max_rank_sum
    
    # Definiowanie kryteriów popularności gatunku
    popularity_criteria = expr("(SUM(1/rank) * COUNT(*)) / (POW(total_days, 2)*{})".format(const))

    # Agregacja danych dla każdego artysty
    comparison_df = df.groupBy(column,"artist_main_genres",*other_columns,"artist_id","total_days").agg(
        countDistinct("track_id").alias("number_of_tracks"),
        popularity_criteria.alias("artist_popularity"),
        avg("track_age").alias("average_track_age"),
        expr("collect_set(track_album_name)").alias("artist_albums"),
        avg("duration_min").alias("average_duration_min"),
        avg("danceability").alias("average_danceability"),
        avg("energy").alias("average_energy"),
        avg("key").alias("average_key"),
        avg("loudness").alias("average_loudness"),
        avg("mode").alias("average_mode"),
        avg("speechiness").alias("average_speechiness"),
        avg("acousticness").alias("average_acousticness"),
        avg("instrumentalness").alias("average_instrumentalness"),
        avg("liveness").alias("average_liveness"),
        avg("valence").alias("average_valence"),
        avg("tempo").alias("average_tempo")
        ).orderBy(column,col("artist_popularity").desc())
    comparison_df = comparison_df.select(column,*other_columns,"artist_popularity","number_of_tracks", "average_track_age", "artist_main_genres", "artist_albums", "average_duration_min", "average_danceability", "average_energy", "average_key", "average_loudness", "average_mode", "average_speechiness", "average_acousticness", "average_instrumentalness", "average_liveness", "average_valence", "average_tempo")
    # Utwórz okno partycjonujące po kolumnie 'grupa' i sortujące po kolumnie 'wartość' malejąco
    window_spec = Window().partitionBy(column).orderBy(comparison_df["artist_popularity"].desc())
    # Dodaj kolumnę rankingu dla każdej grupy
    comparison_df = comparison_df.withColumn("rank", rank().over(window_spec))

    # Wybierz top 5 z każdej grupy
    comparison_df = comparison_df.filter("rank <= 5").drop("rank")
    return comparison_df
    
def get_top_genres(df,column):
    # Dodaj kolumnę "main_genre" do ramki danych
    df1 = df.withColumn("genres", split(df["artist_subgenres"], ","))\
            .withColumn("genres", explode("genres"))\
            .withColumn("artist_main_genres", assign_main_genre_udf("genres"))\
            .withColumn("artist_main_genres", explode("artist_main_genres"))

    df2 = df1.dropDuplicates(subset=["track_id","track_age","artist_main_genres"])

    num_tracks = 50
    max_rank_sum = sum(1/i for i in range(1, 50+1))
    const = num_tracks*max_rank_sum
    
    # Definiowanie kryteriów popularności gatunku
    popularity_criteria = expr("(SUM(1/rank) * COUNT(*)) / (POW(total_days, 2)*{})".format(const))
    
    # Agregacja danych dla każdego gatunku
    comparison_df = df2.groupBy(column,"total_days","artist_main_genres").agg(
        popularity_criteria.alias("genre_popularity"),
        countDistinct("track_id").alias("number_of_tracks")
        )
        
    # Agregacja subgatunków dla każdego głównego gatunku
    subgenres_df = df1.groupBy(column,"artist_main_genres").agg(
        expr("collect_set(genres)").alias("artist_subgenres")
        ).dropDuplicates()
    
    # Połączenie dwóch ramek danych na podstawie 'main_genre'
    comparison_df = comparison_df.join(subgenres_df, on=["artist_main_genres", column]).orderBy(column,col("genre_popularity").desc())    
    comparison_df = comparison_df.select(column,"artist_main_genres","artist_subgenres","genre_popularity","number_of_tracks" )
    # Utwórz okno partycjonujące po kolumnie 'grupa' i sortujące po kolumnie 'wartość' malejąco
    window_spec = Window().partitionBy(column).orderBy(comparison_df["genre_popularity"].desc())
    # Dodaj kolumnę rankingu dla każdej grupy
    comparison_df = comparison_df.withColumn("rank", rank().over(window_spec))

    # Wybierz top 5 z każdej grupy
    comparison_df = comparison_df.filter("rank <= 5").drop("rank")
    return comparison_df

def get_feature_summary(df):
    # Lista cech do analizy
    selected_features = ['danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness',
                         'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo',
                         'duration_min']
    distribution_summary = df.select(selected_features).describe()
    return distribution_summary

def process_period_group(df, column):

    # Dołączanie kolumny przed grupowaniem
    total_days_df = df.groupBy(column).agg(countDistinct("date_added").alias("total_days"))

    # Dołączanie wyniku grupowania do ramki danych z wcześniejszymi danymi
    df = df.join(total_days_df, on=column, how="left")
    
    date = spark.sql("SELECT current_date() AS current_timestamp").first().current_timestamp
    period_path = f"analysis/{column}/{date}/"

    compare_released_within_a_month(df,column).write.parquet(period_path+"compare_with_new_month.parquet")
    compare_released_within_a_year(df,column).write.parquet(period_path+"compare_with_new_year.parquet")
    get_top_tracks(df,column).write.parquet(period_path+"top_tracks.parquet")
    get_top_artists(df,column).write.parquet(period_path+"top_artists.parquet")
    get_top_genres(df,column).write.parquet(period_path+"top_genres.parquet")
                  
    unique_values = df.select(column).distinct().collect()
    summary_df= None
    for value_row in unique_values:
        value = value_row[column]
        df_filtered = df.filter(df[column] == value)
        summary = get_feature_summary(df_filtered)
        summary = summary.withColumn(column, lit(value))
        if summary_df:
            summary_df = summary_df.union(summary)
        else:
            summary_df = summary
    summary_df.write.parquet(period_path + "summary_df.parquet")

# UDF (User Defined Function) dla funkcji assign_main_genre
assign_main_genre_udf = udf(assign_main_genre, ArrayType(StringType()))

In [None]:
static_df = remove_conflicting_columns(static_df, ["track_name", "track_artist","popularity"])
dynamic_df = mark_missing_tracks(dynamic_df,static_df)

merged_df = merge_dataframes(dynamic_df, static_df)
merged_df = assure_date_type(merged_df, ['date_added','track_album_release_date'])
merged_df = assign_unique_track_id(merged_df)
merged_df = merged_df.filter(col("missing") == False).drop("missing")
merged_df = add_seasonal_event_column(merged_df)
merged_df = add_track_age(merged_df)
merged_df = add_season_column(merged_df)
merged_df = transform_artist_genres(merged_df)

In [None]:
start_time = time.time()
periods = ['seasonal_event']
# periods = ['month','seasonal_event','season','day_of_week']
for i in periods:
    process_period_group(merged_df, i)
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Czas wykonania: {elapsed_time} sekundy")

In [None]:
spark.stop()