In [0]:
import boto3
import time
import psycopg2
from pyspark.sql.functions import floor, when, col, length, to_date
from pyspark.sql.utils import AnalysisException
from pyspark.sql.types import IntegerType, DateType
from pyspark.sql.utils import AnalysisException
from datetime import datetime

In [0]:
RAW_MOUNT = '/mnt/raw' # path to mount raw bucket
TRANSFORMED_MOUNT = '/mnt/transformed' # path to mount serving bucket
ARTISTS_DDB_TABLE = '' # ddb table has artist - name, id along with flag columns
SERVING_BUCKET = '' 

REDSHIFT_USER = ''
REDSHIFT_PASSWORD = ''
REDSHIFT_HOST = ''
REDSHIFT_PORT = 5439
REDSHIFT_DATABASE = ''
REDSHIFT_SCHEMA = ''
IAM_ROLE = ''

In [0]:
@udf(returnType=IntegerType())
def get_minute(ms):
    ms = int(ms)
    minute_in_milliseconds = 60000
    minutes = ms // minute_in_milliseconds
    return minutes
    
@udf(returnType=IntegerType())
def get_second(ms):
    ms = int(ms)
    second_in_milliseconds = 1000
    seconds = (ms // second_in_milliseconds) % 60
    return seconds

@udf(returnType=DateType())
def format_date(date):
    formatted_date = None

    if len(date) == 4:
        formatted_date = datetime.strptime(date, '%Y').date()
    elif len(date) == 7:
        formatted_date = datetime.strptime(date, '%Y-%m').date()
    else:
        formatted_date = datetime.strptime(date, '%Y-%m-%d').date()
        
    return formatted_date
    

In [0]:
def transform_artist(file_path):
    artist_df = spark.read.parquet(file_path)
    transformed_artist_df = artist_df.selectExpr('id', 'name', 'CAST(popularity AS INTEGER)', 'genres', 'CAST(followers AS INTEGER)', 'href', 'image')
    return transformed_artist_df


def transform_album(file_path):
    album_df = spark.read.parquet(file_path)
    filtered_df = album_df.filter(col('total_tracks') != -1)
    transformed_album_df = filtered_df.selectExpr('id', 'name', 'artist_name', 'CAST(total_tracks AS INTEGER)','label', 'CAST(popularity AS INTEGER)', 'href', 'image')
    return transformed_album_df

def transform_track(file_path):
    track_df = spark.read.parquet(file_path)
    filtered_df = track_df.filter(col('duration_ms') != -1)

    transformed_df = filtered_df.withColumn('minute', floor(col('duration_ms')/60000))\
                                .withColumn('second', floor(col('duration_ms')/1000)%60)\
                                .withColumn('release_date', when(length('release_date') == 4, to_date("release_date", "yyyy").cast(DateType()))
                                                            .when(length("release_date") == 7, to_date("release_date", "yyyy-MM").cast(DateType()))
                                                            .otherwise(to_date("release_date", "yyyy-MM-dd").cast(DateType())))

    col_ordered_df = transformed_df.selectExpr('id', 'name', 'CAST(popularity AS INTEGER)', 'CAST(duration_ms AS INTEGER)', 'minute', 'second', 'CAST(explicit AS BOOLEAN)', 'artist_id', 'album_id',           
                                               'release_date', 'release_date_precision', 'image', 'href')
    return col_ordered_df

In [0]:
#Transforming Artists
transformed_artists = transform_artist(f'{RAW_MOUNT}/artists.parquet').coalesce(1)
transformed_artists.write.mode('append').option('path', f'{TRANSFORMED_MOUNT}/artists').saveAsTable('artists')

dynamo_db = boto3.client('dynamodb', aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_KEY, region_name='ap-south-1')
to_extract = [item['name']['S'] for item in get_all_items(dynamo_db, table_name=ARTISTS_DDB_TABLE) if item['is_extracted']['BOOL'] == True and item['is_transformed']['BOOL'] == False]

#Transforming Albums and Tracks
for artist in to_extract:
    ARTIST_NAME = artist
    print(f'transforming data of {ARTIST_NAME}')
    ARTIST_PATH = f'{RAW_MOUNT}/{ARTIST_NAME}'
    ALBUMS_PATH = f'{ARTIST_PATH}/albums.parquet'
    TRACKS_PATH = f'{ARTIST_PATH}/tracks.parquet'

    try:
        transformed_albums = transform_album(ALBUMS_PATH).coalesce(1)
        transformed_albums.write.mode('append').option('path', f'{TRANSFORMED_MOUNT}/albums').saveAsTable('albums')
    except AnalysisException as ae:
        if 'Path does not exist' in str(ae):
            print(f"\t-{ARTIST_NAME} doesn't have albums")

    try:
        transformed_tracks = transform_track(TRACKS_PATH).coalesce(1)
        transformed_tracks.write.mode('append').option('path', f'{TRANSFORMED_MOUNT}/tracks').saveAsTable('tracks')
    except AnalysisException as ae:
        if 'Path does not exist' in str(ae):
            print(f"\t-{ARTIST_NAME} doesn't have tracks")
    
    # Set `is_transformed` flag to True
    update_is_transformed(dynamo_db, table_name=ARTISTS_DDB_TABLE, key=ARTIST_NAME, is_transformed=True)

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- total_tracks: integer (nullable = true)
 |-- label: string (nullable = true)
 |-- popularity: integer (nullable = true)
 |-- href: string (nullable = true)
 |-- image: string (nullable = true)

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- popularity: integer (nullable = true)
 |-- duration_ms: integer (nullable = true)
 |-- minute: long (nullable = true)
 |-- second: long (nullable = true)
 |-- explicit: boolean (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- album_id: string (nullable = true)
 |-- release_date: date (nullable = true)
 |-- release_date_precision: string (nullable = true)
 |-- image: string (nullable = true)
 |-- href: string (nullable = true)

