# Configuracion e Importaciones

In [None]:
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
import findspark
from pyspark.sql import SparkSession
import psycopg2
from pyspark.sql import Row
# --------- Configurations ---------
#DATABASE
DB_HOST = 'host.docker.internal'
DB_PORT = '5432'
DB_NAME = 'music_discovery_db'
DB_USER = 'postgres'
DB_PASS = 'Elilouyea1' 
#API SPOTIFY
CLIENT_ID = '524a3c52e62b4b4facef52683534465d'
CLIENT_SECRET = '77c92fc62e0845d38ae328167fb5962a'

# Creacion de la DB y las TABLAS a utilizar

In [None]:
# Conexión a PostgreSQL (base de datos default para verificar existencia de tu base de datos)
conn = psycopg2.connect(database="postgres", user=DB_USER, password=DB_PASS, host=DB_HOST, port=DB_PORT)
conn.autocommit = True  # Esto es necesario para crear una base de datos y para operaciones DDL
cur = conn.cursor()

# Verifica si la base de datos 'music_discovery_db' ya existe
cur.execute("SELECT 1 FROM pg_catalog.pg_database WHERE datname = 'music_discovery_db'")
exists = cur.fetchone()

# Si no existe, la crea
if not exists:
    cur.execute("CREATE DATABASE music_discovery_db")

# Cierra esta conexión y cursor porque vamos a conectarnos a la nueva base de datos
cur.close()
conn.close()

# Ahora nos conectamos a la base de datos 'music_discovery_db'
conn = psycopg2.connect(database=DB_NAME, user=DB_USER, password=DB_PASS, host=DB_HOST, port=DB_PORT)
cur = conn.cursor()

# Crea las tablas si no existen
cur.execute("""
CREATE TABLE IF NOT EXISTS public.search_terms (
    id serial4 NOT NULL,
    term text NOT NULL,
    CONSTRAINT search_terms_pkey PRIMARY KEY (id)
);
""")

# Chequear que existan registros en la tabla search_terms y si no, insertar algunos
cur.execute("SELECT COUNT(*) FROM public.search_terms")
count = cur.fetchone()[0]

if count == 0:
    cur.execute("INSERT INTO public.search_terms (term) VALUES ('Imagine');")
    cur.execute("INSERT INTO public.search_terms (term) VALUES ('Yesterday');")
    cur.execute("INSERT INTO public.search_terms (term) VALUES ('Stairway to Heaven');")
    cur.execute("INSERT INTO public.search_terms (term) VALUES ('Bohemian Rhapsody');")



cur.execute("""
CREATE TABLE IF NOT EXISTS public.search_results (
    id serial4 NOT NULL,
    search_term_id int4 NULL,
    name text NULL,
    artist text NULL,
    album text NULL,
    album_id text NULL,
    release_date date NULL,
    length int4 NULL,
    popularity int4 NULL,
    duration int4 NULL,
    search_date timestamp NULL DEFAULT CURRENT_TIMESTAMP,
    CONSTRAINT search_results_pkey PRIMARY KEY (id),
    CONSTRAINT search_results_search_term_id_fkey FOREIGN KEY (search_term_id) REFERENCES public.search_terms(id)
);
""")
print("Tables created successfully")

## Funciones de utilidad

In [None]:
def format_release_date(date_str):
    if len(date_str) == 4:
        return f"{date_str}-01-01"
    elif len(date_str) == 7:
        return f"{date_str}-01"
    else:
        return date_str

print("Functions created successfully")

# Autenticacion y Conexiones

In [None]:
client_credentials_manager = SpotifyClientCredentials(client_id=CLIENT_ID, client_secret=CLIENT_SECRET)
sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)

# Consultar términos de búsqueda
cur.execute("SELECT id, term FROM search_terms")
terms = cur.fetchall()
# Configuración de Spark
findspark.init()
spark = SparkSession.builder.appName("Spotify Data Analysis").getOrCreate()

print("Spark session created successfully")

# Busqueda en Spotify y Almacenamiento en la lista

In [None]:
# Paralelizar la búsqueda en Spotify
def search_spotify(term):
    term_id, term_name = term[0], term[1]
    results = sp.search(q=term_name, limit=10)
    tracks_list = []
    for track in results['tracks']['items']:
        track_data = {
            'search_term_id': term_id,
            'name': track['name'],
            'artist': track['artists'][0]['name'],
            'album': track['album']['name'],
            'album_id': track['album']['id'],
            'release_date': format_release_date(track['album']['release_date']),
            'length': track['duration_ms'],
            'popularity': track['popularity'],
            'duration': track['duration_ms']
        }
        tracks_list.append(track_data)
    return tracks_list


all_tracks_list = [search_spotify(term) for term in terms]
all_tracks_flat = [item for sublist in all_tracks_list for item in sublist]

# Convertir la lista de pistas a un DataFrame de Spark usando all_tracks_flat
rdd = spark.sparkContext.parallelize(all_tracks_flat)
df_spark = rdd.map(lambda x: Row(**x)).toDF()

# Análisis básico con Spark
df_spark.groupBy("artist").count().show()

# Inserción en la base de datos
insert_query = """
    INSERT INTO search_results (search_term_id, name, artist, album, album_id, release_date, length, popularity, duration) 
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
"""

for row in all_tracks_flat:
    if not isinstance(row, dict):  # Diagnóstico para identificar elementos problemáticos
        print(f"Unexpected item in all_tracks_flat: {row}")
        continue  # Salta este elemento y sigue con el siguiente

    data = (row['search_term_id'], row['name'], row['artist'], row['album'], row['album_id'], row['release_date'], row['length'], row['popularity'], row['duration'])
    cur.execute(insert_query, data)
    
conn.commit()

print("Data inserted successfully")

# Limpieza y cierre

In [None]:
cur.close()
conn.close()
spark.stop()
print("Connection closed successfully")