In [104]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, regexp_replace, translate, concat, concat_ws, split, isnan, udf
from pyspark.sql.types import IntegerType,BooleanType,DateType, FloatType

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import PCA

from pyspark.ml import Pipeline
from pyspark.ml.clustering import KMeans
from pyspark.ml.functions import vector_to_array

import plotly.express as px

from scipy.spatial.distance import euclidean

In [67]:
spark = SparkSession.builder \
    .master('local[*]') \
    .appName('Iniciando com Spark') \
    .config('spark.ui.port', '4050') \
    .getOrCreate()

In [68]:
def read_file(
    file_location:str,
    infer_schema = "true",
    first_row_is_header = "true",
    delimiter = "|",
    file_type = 'csv'
    ):
    
    df_spark = spark.read.format(file_type) \
    .option('inferSchema', infer_schema) \
    .option('header', first_row_is_header) \
    .option('sep', delimiter) \
    .load(file_location)

    return df_spark

In [69]:
def replace_multiples_characters(column,
                                list_replace:list, 
                                new_characters:str
                                ) -> []:
    
    for character in list_replace:
        column = column.str.replace(character, new_characters)
    return column

In [70]:
def alter_type_column(
        df,
        column_name:str,
        new_type:str,
    ):
    
    df_ajusted = df.withColumn(column_name, df[column_name].cast(new_type))
        
    return df_ajusted

In [71]:
def remove_string_rows(
        df,
        cols:[],
        type_of_col:str
):
    for i in cols:
        df = df.withColumn("isNumber", col(str(i)).cast(type_of_col).isNotNull()).filter(col("isNumber") == True)
        df = df.drop("isNumber")

    return df

In [94]:
def project_clusters(
    df_data_sp,
    columns_not_object:[],
    k_pca=2,
    SEED=1224
    ):

    pca_pipeline = Pipeline(
        stages = [
            VectorAssembler(inputCols=columns_not_object,outputCol='features'),
            StandardScaler(inputCol='features', outputCol='features_scaled'),
            PCA(k=k_pca, inputCol='features_scaled', outputCol='pca_features')
            ]
        )

    model_pca_pipeline = pca_pipeline.fit(df_data_sp)
    projection = model_pca_pipeline.transform(df_data_sp)

    kmeans = KMeans(k=20, featuresCol='pca_features',\
        predictionCol='cluster_pca', seed=SEED)
    model_kmeans = kmeans.fit(projection)
    projection_kmeans = model_kmeans.transform(projection)

    projection_kmeans = projection_kmeans.withColumn('x', vector_to_array('pca_features')[0])\
        .withColumn('y', vector_to_array('pca_features')[1])
    
    return projection_kmeans


In [107]:
def created_recommendation (
    projection_kmeans,
    selected_music = 'Teardrops_Bring Me The Horizon',
    columns_of_interesse = ['pca_features', 'cluster_pca',\
                        'artists','name', 'year', 'name_music_artists']
    ):
    
    def calculate_distance(value):
        return euclidean(music_components, value)

    selected_colections = projection_kmeans.filter(projection_kmeans.name_artists == selected_music).select(columns_of_interesse).take(1)[0]

    recommended_musics =  projection_kmeans.where(projection_kmeans.cluster_pca == selected_colections[1])

    music_components = recommended_musics.filter(recommended_musics\
        .name_artists == selected_colections[5])\
        .dropDuplicates(['name_artists'])\
        .select('pca_features').collect()[0][0]

    udf_calculate_distance = udf(
        calculate_distance, 
        FloatType()
        )

    recommended_musics_dist = recommended_musics.withColumn(
        'Dist', 
        udf_calculate_distance('pca_features')
        )
    recommendation = spark.createDataFrame(
        recommended_musics_dist.sort('Dist').take(11)
        ).select(['name','artists','id', 'Dist'])
    recommendation = recommendation.where(recommendation.Dist>0)
    
    return recommendation


In [72]:
df_data = read_file('./data/data.csv', delimiter=',')

                                                                                

In [73]:
df_data.show(5)

+-------+----+------------+--------------------+------------------+-----------+------+--------+--------------------+----------------+---+--------+--------+----+--------------------+----------+------------+-----------+-----------------+
|valence|year|acousticness|             artists|      danceability|duration_ms|energy|explicit|                  id|instrumentalness|key|liveness|loudness|mode|                name|popularity|release_date|speechiness|            tempo|
+-------+----+------------+--------------------+------------------+-----------+------+--------+--------------------+----------------+---+--------+--------+----+--------------------+----------+------------+-----------+-----------------+
| 0.0594|1921|       0.982|['Sergei Rachmani...|             0.279|     831667| 0.211|       0|4BJqT0PrAfrxzMOxy...|           0.878| 10|   0.665| -20.096|   1|Piano Concerto No...|         4|        1921|     0.0366|           80.954|
|  0.963|1921|       0.732|      ['Dennis Day']|0.819000

In [97]:

float_columns = ['valence','acousticness', 'danceability', \
                'instrumentalness', 'liveness', 'loudness',\
                'speechiness', 'tempo']
int_columns = ['duration_ms', 'energy', 'key', 'mode', \
                'popularity', 'release_date', 'explicit']

for i in float_columns:
    df_data = alter_type_column(
        df_data,
        column_name=i,
        new_type='float'
    )
for i in int_columns:
    df_data = alter_type_column(
        df_data,
        column_name=i,
        new_type='int'
    )


In [75]:
df_data = df_data.withColumn("artists", translate("artists", "[]'", ""))
df_data = df_data.withColumn("artists", translate('artists', '"', ''))
df_data = df_data.withColumn("artists", translate("artists", ",", "|"))

In [76]:
df_data = df_data.withColumn('name_music_artists', (concat_ws('_',df_data.artists, df_data.name)))

In [77]:
df_data.show()

+-------+----+------------+--------------------+------------+-----------+-------------------+--------+--------------------+----------------+---+--------+--------+----+--------------------+----------+------------+-----------+-------+--------------------+
|valence|year|acousticness|             artists|danceability|duration_ms|             energy|explicit|                  id|instrumentalness|key|liveness|loudness|mode|                name|popularity|release_date|speechiness|  tempo|  name_music_artists|
+-------+----+------------+--------------------+------------+-----------+-------------------+--------+--------------------+----------------+---+--------+--------+----+--------------------+----------+------------+-----------+-------+--------------------+
| 0.0594|1921|       0.982|Sergei Rachmanino...|       0.279|     831667|              0.211|       0|4BJqT0PrAfrxzMOxy...|           0.878| 10|   0.665| -20.096|   1|Piano Concerto No...|         4|        1921|     0.0366| 80.954|Sergei

In [78]:
# df_data = df_data.withColumn('year', split(df_data['release_date'], '-').getItem(0))

In [79]:
df_columns = df_data.columns
df_data = df_data[df_columns]

df_data = df_data.na.drop(subset=['release_date'])
# df_data = df_data.dropDuplicates(subset=['release_date'])


columns_not_object = df_data.columns
columns_not_object.remove('artists')
columns_not_object.remove('id')
columns_not_object.remove('name')
columns_not_object.remove('name_music_artists')
columns_not_object.remove('explicit')


In [80]:
# df_data.summary().show()

In [81]:
df_data.show()

+-------+----+------------+--------------------+------------+-----------+-------------------+--------+--------------------+----------------+---+--------+--------+----+--------------------+----------+------------+-----------+-------+--------------------+
|valence|year|acousticness|             artists|danceability|duration_ms|             energy|explicit|                  id|instrumentalness|key|liveness|loudness|mode|                name|popularity|release_date|speechiness|  tempo|  name_music_artists|
+-------+----+------------+--------------------+------------+-----------+-------------------+--------+--------------------+----------------+---+--------+--------+----+--------------------+----------+------------+-----------+-------+--------------------+
| 0.0594|1921|       0.982|Sergei Rachmanino...|       0.279|     831667|              0.211|       0|4BJqT0PrAfrxzMOxy...|           0.878| 10|   0.665| -20.096|   1|Piano Concerto No...|         4|        1921|     0.0366| 80.954|Sergei

In [82]:
import numpy as np

name_list = df_data.select('release_date').rdd.flatMap(lambda x: x).collect()
new = np.unique(name_list)
# df_data['release_date'].colect().unique().show()



                                                                                

In [83]:
year = [x for x in np.unique(name_list) if x is int]
name_list[1]

'1921'

In [96]:
df_data_1 = df_data.filter(
    (df_data.speechiness<1) 
    & 
    (df_data.energy < 1)
    &
    (df_data.explicit < 1)
    )



In [85]:

df_data = remove_string_rows(
        df=df_data,
        cols=float_columns,
        type_of_col= "float"
)
df_data = remove_string_rows(
        df=df_data,
        cols=int_columns,
        type_of_col= "int"
)

df_data.summary().show()



+-------+------------------+------------------+-------------------+-----------------+-------------------+------------------+-------------------+-------------------+--------------------+------------------+------------------+-------------------+-------------------+-------------------+--------------------+------------------+------------------+-------------------+------------------+--------------------+
|summary|           valence|              year|       acousticness|          artists|       danceability|       duration_ms|             energy|           explicit|                  id|  instrumentalness|               key|           liveness|           loudness|               mode|                name|        popularity|      release_date|        speechiness|             tempo|  name_music_artists|
+-------+------------------+------------------+-------------------+-----------------+-------------------+------------------+-------------------+-------------------+--------------------+---------

                                                                                

In [99]:
projection_kmeans = project_clusters(
    df_data_sp=df_data,
    columns_not_object=columns_not_object
)
projection_kmeans.show(5)

                                                                                

+-------+----+------------+--------------------+------------+-----------+------+--------+--------------------+----------------+---+--------+--------+----+--------------------+----------+------------+-----------+-------+--------------------+--------------------+--------------------+--------------------+-----------+------------------+-------------------+
|valence|year|acousticness|             artists|danceability|duration_ms|energy|explicit|                  id|instrumentalness|key|liveness|loudness|mode|                name|popularity|release_date|speechiness|  tempo|  name_music_artists|            features|     features_scaled|        pca_features|cluster_pca|                 x|                  y|
+-------+----+------------+--------------------+------------+-----------+------+--------+--------------------+----------------+---+--------+--------+----+--------------------+----------+------------+-----------+-------+--------------------+--------------------+--------------------+--------

In [108]:
recommendation = created_recommendation(
    projection_kmeans
    )
recommendation.show()

AttributeError: 'DataFrame' object has no attribute 'name_artists'

+-------+----+------------+--------------------+------------+-----------+-------------------+--------+--------------------+----------------+---+--------+--------+----+--------------------+----------+------------+-----------+-------+--------------------+
|valence|year|acousticness|             artists|danceability|duration_ms|             energy|explicit|                  id|instrumentalness|key|liveness|loudness|mode|                name|popularity|release_date|speechiness|  tempo|  name_music_artists|
+-------+----+------------+--------------------+------------+-----------+-------------------+--------+--------------------+----------------+---+--------+--------+----+--------------------+----------+------------+-----------+-------+--------------------+
| 0.0594|1921|       0.982|Sergei Rachmanino...|       0.279|     831667|              0.211|       0|4BJqT0PrAfrxzMOxy...|           0.878| 10|   0.665| -20.096|   1|Piano Concerto No...|         4|        1921|     0.0366| 80.954|Sergei



+-------+------------------+------------------+-------------------+-----------------+-------------------+------------------+-------------------+-------------------+--------------------+------------------+------------------+-------------------+-------------------+-------------------+--------------------+------------------+------------------+-------------------+------------------+--------------------+
|summary|           valence|              year|       acousticness|          artists|       danceability|       duration_ms|             energy|           explicit|                  id|  instrumentalness|               key|           liveness|           loudness|               mode|                name|        popularity|      release_date|        speechiness|             tempo|  name_music_artists|
+-------+------------------+------------------+-------------------+-----------------+-------------------+------------------+-------------------+-------------------+--------------------+---------

                                                                                

['valence',
 'year',
 'acousticness',
 'danceability',
 'duration_ms',
 'energy',
 'instrumentalness',
 'key',
 'liveness',
 'loudness',
 'mode',
 'popularity',
 'release_date',
 'speechiness',
 'tempo']