In [1]:
import os
import yaml
import logging
import redshift_connector
import awswrangler as wr
import pandas as pd

In [2]:
with open(r'C:\Users\Nico\Documents\TRABAJOS\music-dwh\airflow\dags\.env\.cfg\creds.yaml', 'r') as creds:
    creds = yaml.safe_load(creds)
    host = creds['redshift']['host']
    port = creds['redshift']['port']
    db = creds['redshift']['db']
    user = creds['redshift']['user']
    password = creds['redshift']['password']
    logging.info('Credentials read.')

In [3]:
conn = redshift_connector.connect(database=db, user=user, password=password, host=host, port=port)

In [4]:
with conn.cursor() as cur:
    cur.execute('SELECT * FROM "2024_domingo_nicolas_morelli_schema"."staging_daily_artists"')
    daily = cur.fetch_dataframe()

In [5]:
daily

Unnamed: 0,name,tag,listeners,playcount,rank,stats_date
0,Iron Maiden,Heavy Metal,2731939,216835483,1,2024-09-20
1,Black Sabbath,Heavy Metal,3583951,156159230,2,2024-09-20
2,Ghost,Heavy Metal,1239942,105183064,3,2024-09-20
3,Judas Priest,Heavy Metal,1816650,88055143,4,2024-09-20
4,Motörhead,Heavy Metal,1991508,80972958,5,2024-09-20
...,...,...,...,...,...,...
595,Black Tongue,Deathcore,55274,1656709,46,2024-09-20
596,War from a Harlots Mouth,Deathcore,100908,2825466,47,2024-09-20
597,Distant,Deathcore,73658,1281125,48,2024-09-20
598,Bodysnatcher,Deathcore,63540,1292862,49,2024-09-20


In [6]:
def handle_new_artists(daily):
    daily = daily.rename(columns={'name': 'artist_name', 'rank': 'current_rank', 'stats_date': 'effective_date'})
    daily['max_rank'] = daily['current_rank']
    daily['new_listeners'] = daily['listeners']
    daily['new_plays'] = daily['playcount']
    daily['expiration_date'] = '9999-12-31'
    daily['last_known'] = 'Yes'
    daily['consecutive_times_in_top_50'] = 1

    return daily


def handle_repeated_artists(daily):
    daily['new_plays'] = daily['playcount_daily'].astype(int) - daily['playcount_old'].astype(int).fillna(0)
    daily['playcount'] = daily['playcount_daily'].astype(int)
    daily['new_listeners'] = daily['listeners_daily'].astype(int) - daily['listeners_old'].astype(int).fillna(0)
    daily['listeners'] = daily['listeners_daily'].astype(int)
    daily['max_rank'] = daily[['current_rank_daily', 'max_rank']].astype(int).min(axis=1)
    daily['current_rank'] = daily['current_rank_daily'].astype(int)
    daily['expiration_date'] = '9999-12-31'
    daily['effective_date'] = daily['effective_date_daily']
    daily['consecutive_times_in_top_50'] += 1

    cols_to_drop = []
    for col in daily.columns:
        if '_daily' in col or '_old' in col:
            cols_to_drop.append(col)
    cols_to_drop.append('id')

    daily = daily.drop(cols_to_drop, axis=1)

    return daily

def handle_out_artists(daily):
    daily['new_listeners'] = 0
    daily['new_plays'] = 0
    daily['current_rank'] = pd.NA
    daily = daily.drop('id', axis=1)

    return daily

In [7]:
dummy_artists = [{
    'id': 1,
    'artist_id': 1,
    'artist_name': 'Iron Maiden',
    'max_rank': 40,
    'current_rank': 40,
    'listeners': 1,
    'new_listeners': 1,
    'playcount': 1,
    'new_plays': 1,
    'consecutive_times_in_top_50': 20,
    'effective_date': '2024-09-20',
    'expiration_date': '9999-12-31',
    'last_known': 'Yes'
},
{
    'id': 1,
    'artist_id': 2,
    'artist_name': 'Miranda',
    'max_rank': 49,
    'current_rank': 50,
    'listeners': 1,
    'new_listeners': 1,
    'playcount': 1,
    'new_plays': 1,
    'consecutive_times_in_top_50': 3,
    'effective_date': '2024-09-20',
    'expiration_date': '9999-12-31',
    'last_known': 'Yes'
}]

artists = pd.DataFrame(dummy_artists)

In [8]:
daily = daily.rename(columns={'name': 'artist_name', 'rank': 'current_rank', 'stats_date': 'effective_date'})

In [9]:
daily_new_artists = daily[~daily['artist_name'].isin(artists['artist_name'])]
daily_repeated_artists = daily.merge(artists, on='artist_name', how='inner', suffixes=['_daily', '_old'])
daily_out_artists = artists[~artists['artist_name'].isin(daily['artist_name'])]

In [10]:
daily_repeated_artists

Unnamed: 0,artist_name,tag,listeners_daily,playcount_daily,current_rank_daily,effective_date_daily,id,artist_id,max_rank,current_rank_old,listeners_old,new_listeners,playcount_old,new_plays,consecutive_times_in_top_50,effective_date_old,expiration_date,last_known
0,Iron Maiden,Heavy Metal,2731939,216835483,1,2024-09-20,1,1,40,40,1,1,1,1,20,2024-09-20,9999-12-31,Yes


In [11]:
daily_out_artists

Unnamed: 0,id,artist_id,artist_name,max_rank,current_rank,listeners,new_listeners,playcount,new_plays,consecutive_times_in_top_50,effective_date,expiration_date,last_known
1,1,2,Miranda,49,50,1,1,1,1,3,2024-09-20,9999-12-31,Yes


In [12]:
daily_new_artists = handle_new_artists(daily_new_artists)

In [13]:
daily_new_artists

Unnamed: 0,artist_name,tag,listeners,playcount,current_rank,effective_date,max_rank,new_listeners,new_plays,expiration_date,last_known,consecutive_times_in_top_50
1,Black Sabbath,Heavy Metal,3583951,156159230,2,2024-09-20,2,3583951,156159230,9999-12-31,Yes,1
2,Ghost,Heavy Metal,1239942,105183064,3,2024-09-20,3,1239942,105183064,9999-12-31,Yes,1
3,Judas Priest,Heavy Metal,1816650,88055143,4,2024-09-20,4,1816650,88055143,9999-12-31,Yes,1
4,Motörhead,Heavy Metal,1991508,80972958,5,2024-09-20,5,1991508,80972958,9999-12-31,Yes,1
5,Ozzy Osbourne,Heavy Metal,2493047,69054377,6,2024-09-20,6,2493047,69054377,9999-12-31,Yes,1
...,...,...,...,...,...,...,...,...,...,...,...,...
595,Black Tongue,Deathcore,55274,1656709,46,2024-09-20,46,55274,1656709,9999-12-31,Yes,1
596,War from a Harlots Mouth,Deathcore,100908,2825466,47,2024-09-20,47,100908,2825466,9999-12-31,Yes,1
597,Distant,Deathcore,73658,1281125,48,2024-09-20,48,73658,1281125,9999-12-31,Yes,1
598,Bodysnatcher,Deathcore,63540,1292862,49,2024-09-20,49,63540,1292862,9999-12-31,Yes,1


In [14]:
daily_repeated_artists = handle_repeated_artists(daily_repeated_artists)

In [15]:
daily[daily['artist_name'] == 'Iron Maiden']

Unnamed: 0,artist_name,tag,listeners,playcount,current_rank,effective_date
0,Iron Maiden,Heavy Metal,2731939,216835483,1,2024-09-20


In [16]:
daily_repeated_artists

Unnamed: 0,artist_name,tag,artist_id,max_rank,new_listeners,new_plays,consecutive_times_in_top_50,expiration_date,last_known,playcount,listeners,current_rank,effective_date
0,Iron Maiden,Heavy Metal,1,1,2731938,216835482,21,9999-12-31,Yes,216835483,2731939,1,2024-09-20


In [17]:
miss_cols = set(daily_new_artists.columns) - set(daily_repeated_artists.columns)
miss_cols

set()

In [18]:
miss_cols = set(daily_repeated_artists.columns) - set(daily_new_artists.columns)
miss_cols

{'artist_id'}

In [19]:
daily_out_artists = handle_out_artists(daily_out_artists)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  daily['new_listeners'] = 0
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  daily['new_plays'] = 0
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  daily['current_rank'] = pd.NA


In [20]:
with conn.cursor() as cur:
    
    cur.execute('SELECT * FROM "2024_domingo_nicolas_morelli_schema"."staging_daily_artists"')
    daily = cur.fetch_dataframe()
    daily = daily.rename(columns={'name': 'artist_name', 'rank': 'current_rank', 'stats_date': 'effective_date'})
    daily_new_artists = daily[~daily['artist_name'].isin(artists['artist_name'])]
    daily_repeated_artists = daily.merge(artists, on='artist_name', how='inner', suffixes=['_daily', '_old'])
    daily_out_artists = artists[~artists['artist_name'].isin(daily['artist_name'])]
    daily_new_artists = handle_new_artists(daily_new_artists)
    daily_repeated_artists = handle_repeated_artists(daily_repeated_artists)
    daily_out_artists = handle_out_artists(daily_out_artists)
    daily_new_artists['artist_id'] = pd.NA
    daily = pd.concat([daily_new_artists, daily_repeated_artists, daily_out_artists]).reset_index(drop=True)
    max_id = daily['artist_id'].max()
    print(max_id)
    for index, _ in daily[daily['artist_id'].isna()].iterrows():
        daily.loc[index, 'artist_id'] = max_id + 1
        max_id += 1

2


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  daily['new_listeners'] = 0
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  daily['new_plays'] = 0
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  daily['current_rank'] = pd.NA


In [21]:
daily

Unnamed: 0,artist_name,tag,listeners,playcount,current_rank,effective_date,max_rank,new_listeners,new_plays,expiration_date,last_known,consecutive_times_in_top_50,artist_id
0,Black Sabbath,Heavy Metal,3583951,156159230,2,2024-09-20,2,3583951,156159230,9999-12-31,Yes,1,3
1,Ghost,Heavy Metal,1239942,105183064,3,2024-09-20,3,1239942,105183064,9999-12-31,Yes,1,4
2,Judas Priest,Heavy Metal,1816650,88055143,4,2024-09-20,4,1816650,88055143,9999-12-31,Yes,1,5
3,Motörhead,Heavy Metal,1991508,80972958,5,2024-09-20,5,1991508,80972958,9999-12-31,Yes,1,6
4,Ozzy Osbourne,Heavy Metal,2493047,69054377,6,2024-09-20,6,2493047,69054377,9999-12-31,Yes,1,7
...,...,...,...,...,...,...,...,...,...,...,...,...,...
596,Distant,Deathcore,73658,1281125,48,2024-09-20,48,73658,1281125,9999-12-31,Yes,1,599
597,Bodysnatcher,Deathcore,63540,1292862,49,2024-09-20,49,63540,1292862,9999-12-31,Yes,1,600
598,Vulvodynia,Deathcore,46882,1301802,50,2024-09-20,50,46882,1301802,9999-12-31,Yes,1,601
599,Iron Maiden,Heavy Metal,2731939,216835483,1,2024-09-20,1,2731938,216835482,9999-12-31,Yes,21,1


In [59]:
import os
import yaml
import logging
from datetime import datetime
import redshift_connector
import awswrangler as wr
import pandas as pd


def handle_new(daily, type):
    daily = daily.rename(columns={'name': 'artist_name', 'rank': 'current_rank', 'stats_date': 'effective_date'})
    daily['max_rank'] = daily['current_rank']
    daily['new_listeners'] = daily['listeners']
    daily['new_plays'] = daily['playcount']
    daily['expiration_date'] = '9999-12-31'
    daily['last_known'] = 'Yes'

    if type == 'artists':
        daily['consecutive_times_in_top_50'] = 1

    return daily


def handle_repeated(daily, type):
    daily['new_plays'] = daily['playcount_daily'].astype(int) - daily['playcount_old'].astype(int).fillna(0)
    daily['playcount'] = daily['playcount_daily'].astype(int)
    daily['new_listeners'] = daily['listeners_daily'].astype(int) - daily['listeners_old'].astype(int).fillna(0)
    daily['listeners'] = daily['listeners_daily'].astype(int)
    daily['max_rank'] = daily[['current_rank_daily', 'max_rank']].astype(int).min(axis=1)
    daily['current_rank'] = daily['current_rank_daily'].astype(int)
    daily['expiration_date'] = '9999-12-31'
    daily['effective_date'] = daily['effective_date_daily']

    if type == 'artists':
        daily['consecutive_times_in_top_50'] += 1

    cols_to_drop = []
    for col in daily.columns:
        if '_daily' in col or '_old' in col:
            cols_to_drop.append(col)
    cols_to_drop.append('id')

    daily = daily.drop(cols_to_drop, axis=1)

    return daily


def handle_out(daily, type):
    daily['new_listeners'] = 0
    daily['new_plays'] = 0
    daily['current_rank'] = pd.NA
    daily = daily.drop('id', axis=1)

    return daily


def from_redshift_to_redshift(func):
    def wrapper(*args, **kwargs):
        pathcreds = r'C:\Users\Nico\Documents\TRABAJOS\music-dwh\airflow\dags\.env\.cfg\creds.yaml'

        with open(pathcreds, 'r') as creds:
            creds = yaml.safe_load(creds)
            host = creds['redshift']['host']
            port = creds['redshift']['port']
            db = creds['redshift']['db']
            user = creds['redshift']['user']
            password = creds['redshift']['password']
            logging.info('Credentials read.')

        conn = redshift_connector.connect(database=db, user=user, password=password, host=host, port=port)
        kwargs['conn'] = conn

        df_api = func(*args, **kwargs)

        table_name = kwargs['table_name']
        df_api = df_api.infer_objects()

        for col in df_api.columns:
            if df_api[col].dtype == 'int64':
                df_api.loc[:,col] = df_api[col].astype

        print(df_api.dtypes)
        dsfgdsfdsdfs
        wr.redshift.to_sql(df=df_api, con=conn, table=table_name, schema='2024_domingo_nicolas_morelli_schema', mode='append', lock=True, index=False)
        logging.info(f'{table_name} loaded.')

        return

    return wrapper


# TODO: Cuando ya haya pensado logica, sumarle TOP track, TOP album y tag tal vez
@from_redshift_to_redshift
def artist_dim(*args, **kwargs):
    conn = kwargs['conn']
    table_name = kwargs['table_name']

    try:
        with conn.cursor() as cur:
            cur.execute(f"""

                        SELECT *
                        FROM "2024_domingo_nicolas_morelli_schema"."{table_name}"
                        WHERE last_known = 'Yes'

                        """)
            artists = cur.fetch_dataframe()

            cur.execute(f"""UPDATE "2024_domingo_nicolas_morelli_schema"."{table_name}"
                            SET last_known = 'No',
                                expiration_date = {datetime.now().strftime('%Y-%m-%d')}
                            WHERE last_known = 'Yes' AND artist_name IN (SELECT name FROM "2024_domingo_nicolas_morelli_schema"."staging_artists_daily")
                        """)
            conn.commit()

            cur.execute('SELECT * FROM "2024_domingo_nicolas_morelli_schema"."staging_artists_daily"')
            daily = cur.fetch_dataframe()

            daily = daily.rename(columns={'name': 'artist_name', 'rank': 'current_rank', 'stats_date': 'effective_date'})

            daily_new_artists = daily[~daily['artist_name'].isin(artists['artist_name'])]
            daily_repeated_artists = daily.merge(artists, on='artist_name', how='inner', suffixes=['_daily', '_old'])
            daily_out_artists = artists[~artists['artist_name'].isin(daily['artist_name'])]

            daily_new_artists = handle_new(daily_new_artists, type='artists')
            daily_repeated_artists = handle_repeated(daily_repeated_artists, type='artists')
            daily_out_artists = handle_out(daily_out_artists, type='artists')

            daily_new_artists['artist_id'] = pd.NA

            daily = pd.concat([daily_new_artists, daily_repeated_artists, daily_out_artists]).reset_index(drop=True)

            max_id = daily['artist_id'].fillna(-1).astype(int).max()

            for index, _ in daily[daily['artist_id'].isna()].iterrows():
                daily.loc[index, 'artist_id'] = max_id + 1
                max_id += 1

    except redshift_connector.error.ProgrammingError:
        with conn.cursor() as cur:
            conn.commit()
            cur.execute(f"""
                            CREATE TABLE "2024_domingo_nicolas_morelli_schema"."{table_name}"
                            (
                              id INTEGER IDENTITY(1, 1),
                              artist_id INTEGER,
                              artist_name VARCHAR,
                              max_rank INTEGER,
                              current_rank INTEGER,
                              listeners INTEGER,
                              new_listeners INTEGER,
                              playcount INTEGER,
                              new_plays INTEGER,
                              consecutive_times_in_top_50 INTEGER,
                              effective_date VARCHAR,
                              expiration_date VARCHAR,
                              last_known VARCHAR
                            )

                        """)
            conn.commit()
            cur.execute('SELECT * FROM "2024_domingo_nicolas_morelli_schema"."staging_artists_daily"')

            daily = cur.fetch_dataframe()

            daily = handle_new(daily, type='artists')
            daily = daily.drop('tag', axis=1).drop_duplicates().reset_index(names='artist_id')

    return daily.drop('id', errors='ignore')


@from_redshift_to_redshift
def tracks_dim(*args, **kwargs):
    conn = kwargs['conn']
    table_name = kwargs['table_name']

    try:
        with conn.cursor() as cur:
            cur.execute(f"""

                        SELECT *
                        FROM "2024_domingo_nicolas_morelli_schema"."{table_name}"
                        WHERE last_known = 'Yes'

                        """)
            tracks = cur.fetch_dataframe()

            cur.execute(f"""UPDATE "2024_domingo_nicolas_morelli_schema"."{table_name}"
                            SET last_known = 'No',
                                expiration_date = {datetime.now().strftime('%Y-%m-%d')}
                            WHERE last_known = 'Yes' AND track_name || artist_id IN (SELECT DISTINCT dt.name || CAST(da.artist_id AS VARCHAR(255)) FROM "2024_domingo_nicolas_morelli_schema"."staging_tracks_daily" dt JOIN "2024_domingo_nicolas_morelli_schema"."{table_name}" da ON da.artist_name = dt.artist)
                        """)
            conn.commit()

            cur.execute('SELECT * FROM "2024_domingo_nicolas_morelli_schema"."staging_tracks_daily"')
            daily = cur.fetch_dataframe()

            daily = daily.rename(columns={'name': 'track_name', 'rank': 'current_rank', 'stats_date': 'effective_date'})

            daily_new_tracks = daily[~daily['track_name'].isin(tracks['track_name'])]
            daily_repeated_tracks = daily.merge(tracks, on='track_name', how='inner', suffixes=['_daily', '_old'])
            daily_out_tracks = tracks[~tracks['track_name'].isin(daily['track_name'])]

            daily_new_tracks = handle_new(daily_new_tracks, type='tracks')
            daily_repeated_tracks = handle_repeated(daily_repeated_tracks, type='tracks')
            daily_out_tracks = handle_out(daily_out_tracks, type='tracks')

            daily_new_tracks['track_id'] = pd.NA

            daily = pd.concat([daily_new_tracks, daily_repeated_tracks, daily_out_tracks]).reset_index(drop=True)

            max_id = daily['track_id'].fillna(0).astype(int).max()

            for index, _ in daily[daily['track_id'].isna()].iterrows():
                daily.loc[index, 'track_id'] = max_id + 1
                max_id += 1

    except redshift_connector.error.ProgrammingError:
        with conn.cursor() as cur:
            conn.commit()
            cur.execute(f"""
                            CREATE TABLE "2024_domingo_nicolas_morelli_schema"."{table_name}"
                            (
                              id INTEGER IDENTITY(1, 1),
                              track_id INTEGER,
                              artist_id INTEGER,
                              track_name VARCHAR,
                              max_rank INTEGER,
                              current_rank INTEGER,
                              listeners INTEGER,
                              new_listeners INTEGER,
                              playcount INTEGER,
                              new_plays INTEGER,
                              effective_date VARCHAR,
                              expiration_date VARCHAR,
                              last_known VARCHAR
                            )

                        """)
            cur.execute('SELECT * FROM "2024_domingo_nicolas_morelli_schema"."staging_tracks_daily"')

            daily = cur.fetch_dataframe()

            daily = handle_new(daily, type='tracks')
            daily = daily.drop_duplicates().reset_index(names='track_id')

        with conn.cursor() as cur:
            cur.execute('SELECT DISTINCT artist_name, artist_id FROM "2024_domingo_nicolas_morelli_schema"."dim_artists"')
            artists = cur.fetch_dataframe().rename(columns={'artist_name': 'artist'})

        daily = daily.merge(artists, on='artist', how='inner').drop('artist', axis=1)

    return daily.drop('id', errors='ignore')


def tag_dim(*args, **kwargs):
    # Tags
    # TODO: Pruebo consulta y sino creo la tabla
    # TODO: Me fijo los artistas, si hay alguno nuevo lo agrego y le creo un ID
    # TODO: Con SCD 2 actualizo los dias viejos y creo el nuevo con los datos del momento
    # Totalidad de escuchas, promedio de listeners del top 50, artista actual en el rank 1, cancion de algun artista del top 50 con mas escuchas
    pass
