In [12]:
from airflow import DAG
from airflow.operators.python import PythonOperator
import datetime

import extract_helper_functions as ex
import transform_data as tf
import load_update_data as load
import email_weekly_extracts as serve
import pandas as pd
import psycopg2

In [13]:
def spotify_etl_wrapper():
    '''
    Wrapper function to consolidate Spotify ETL scripts. Created for cleaner DAG construction.

    ARGUMENTS:
        None
    '''
    # ID & Secret file
    credentials = pd.read_excel('../../logins.xlsx')

    # Spotify project access credentials
    session_client_id = credentials['client_id'][0]
    session_client_secret = credentials['client_secret'][0]
    session_redirect_url = 'http://localhost:7777/callback'
    session_scope ='user-read-recently-played'

    # PostgreSQL access credentials
    hostname = credentials['hostname'][0]
    database = credentials['database'][0]
    username = credentials['username'][0]
    pwd = credentials['pwd'][0]
    port_id = credentials['port_id'][0]

    # Extract data process
    sp = ex.create_spotipy_client(client_id=session_client_id, client_secret=session_client_secret, redirect_uri=session_redirect_url, scope=session_scope)

    recent_tracks = ex.get_recent_played_tracks(sp, 50)

    # Transform and load data process
    play_log_fact_table, track_dim_table, artist_dim_table, album_dim_table = tf.create_dataframes(recent_tracks)

    load.load_update_tables(play_log_fact_table, track_dim_table, artist_dim_table, album_dim_table, hostname, database, username, pwd, port_id)

In [14]:
def email_wrapper():
    '''
    Wrapper function for generating weekly emails. Created for cleaner DAG construction
    '''
    # ID & Secret file
    credentials = pd.read_excel('../../logins.xlsx')

    # PostgreSQL access credentials
    hostname = credentials['hostname'][0]
    database = credentials['database'][0]
    username = credentials['username'][0]
    pwd = credentials['pwd'][0]
    port_id = credentials['port_id'][0]
    
    # Email generation
    serve.generate_weekly_email(credentials['email'][0], credentials['email'][0], credentials['email'][0], credentials['email_pwd'][0], hostname, database, username, pwd, port_id)

In [15]:
# One DAG to run the ETL daily, one to run the email weekly
# Weekly cron expression: 019**0 Every 7PM Sunday
# Daily cron expression: 018*** Every 6PM
# Daily DAG for extract, transform, load songs listened to data
with DAG(
        'spotify_etl', 
        start_date=datetime.datetime(2023, 3, 11), 
        schedule='018***',
        catchup=False
        ) as etl_dag:
                extract_transform = PythonOperator(
                        task_id='extract_transform',
                        python_callable=spotify_etl_wrapper
                )

with DAG(
        'weekly_email',
        start_date=datetime.datetime(2023, 3, 11), 
        schedule='019**0',
        catchup=False
        ) as email_dag:
                send_email = PythonOperator(
                        task_id='send_email',
                        python_callable=email_wrapper
                )