## Install Requests

In [None]:
%%bash
pip install requests

## SANITY CHECK - Search for a song

In [5]:
import requests

# Token from Genius API
access_token = "0MSSVdpMbNPuX4u5LhP0sFUd5sajyinz28KOWgbs-F9CfrWE4-M_14kZVJckK3GP"

headers = {
    "Authorization": f"Bearer {access_token}"
}

def search_song(query):
    base_url = "https://api.genius.com"
    search_url = f"{base_url}/search"
    params = {"q": query}
    response = requests.get(search_url, params=params, headers=headers)
    return response.json()

# Search for a song
results = search_song("Welcome to New York Taylor Swift")
for hit in results["response"]["hits"][:3]:
    print("Title:", hit["result"]["full_title"])
    print("Genius URL:", hit["result"]["url"])
    print()


Title: Welcome to New York by Taylor Swift
Genius URL: https://genius.com/Taylor-swift-welcome-to-new-york-lyrics

Title: ​portorosso’s DISCONTINUED 2023 Listening Log by still-life starlet
Genius URL: https://genius.com/Still-life-starlet-portorossos-discontinued-2023-listening-log-annotated

Title: ​youth group - background music by Josiah Botting
Genius URL: https://genius.com/Josiah-botting-youth-group-background-music-annotated



# Set Up Table and Bucket

In [6]:
import boto3

# DDB table
dynamodb = boto3.resource('dynamodb')

table_name = 'genius-ddb-table'

table = dynamodb.create_table(
    TableName=table_name,
    KeySchema=[
        {
            'AttributeName': 'user_id',
            'KeyType': 'HASH'
        }
    ],
    AttributeDefinitions=[
        {
            'AttributeName': 'user_id',
            'AttributeType': 'S'
        }
    ],
    ProvisionedThroughput={
        'ReadCapacityUnits': 1,
        'WriteCapacityUnits': 1
    }
)

print(f"DDB table '{table_name}' created!")



DDB table 'genius-ddb-table' created!


In [7]:
import boto3

s3 = boto3.client('s3')

bucket_name = 'genius-s3-bucket-sowder'

s3.create_bucket(
    Bucket=bucket_name,
    CreateBucketConfiguration={'LocationConstraint': 'us-east-2'}
)

print(f"S3 bucket '{bucket_name}' created!")

S3 bucket 'genius-s3-bucket-sowder' created!


# Set up Lambda Function Connection

In [8]:
import boto3
import json

# SQS setup
sqs = boto3.client('sqs')
queue_name = 'genius-sqs-queue'

In [9]:
# create queue
try:
    response = sqs.create_queue(
        QueueName=queue_name,
        Attributes={
            'VisibilityTimeout': '60'
        }
    )
    queue_url = response['QueueUrl']
    print(f"SQS queue created: {queue_url}")
except sqs.exceptions.QueueNameExists:
    queue_url = [url for url in sqs.list_queues()['QueueUrls'] if queue_name in url][0]
    print(f"SQS queue already exists: {queue_url}")

SQS queue created: https://sqs.us-east-2.amazonaws.com/943814146063/genius-sqs-queue


In [10]:
# deploy and connect lambda function
lambda_client = boto3.client('lambda')
iam = boto3.client('iam')
sqs = boto3.client('sqs')

function_name = 'genius_lambda_function'
zip_path = 'genius_lambda_package/genius_lambda.zip'
sqs_queue_url = queue_url
role = iam.get_role(RoleName='GeniusLambdaRole')['Role']['Arn']

with open(zip_path, 'rb') as f:
    zipped_code = f.read()

In [11]:
# create lambda function
try:
    response = lambda_client.create_function(
        FunctionName=function_name,
        Runtime='python3.9',
        Role=role,
        Handler='lambda_function.lambda_handler',
        Code={'ZipFile': zipped_code},
        Timeout=10
    )
    print("lambda function created!")
except lambda_client.exceptions.ResourceConflictException:
    print("lambda function already exists")
    response = lambda_client.update_function_code(
        FunctionName=function_name,
        ZipFile=zipped_code
    )

lambda function created!


In [12]:
# get ARN for trigger
sqs_attrs = sqs.get_queue_attributes(
    QueueUrl=sqs_queue_url,
    AttributeNames=['QueueArn']
)
sqs_arn = sqs_attrs['Attributes']['QueueArn']

In [13]:
# check that the function is there

lambda_client = boto3.client('lambda', region_name='us-east-2')
functions = lambda_client.list_functions()

for f in functions['Functions']:
    print(f['FunctionName'])

genius_lambda


In [14]:
# create SQS trigger
try:
    lambda_client.create_event_source_mapping(
        EventSourceArn=sqs_arn,
        FunctionName=function_name,
        Enabled=True,
        BatchSize=10
    )
    print("trigger creation successful")
except lambda_client.exceptions.ResourceConflictException:
    print("trigger already exists")

trigger creation successful


## Set Up Lambda Environment Variable

In [15]:
import boto3

lambda_client = boto3.client('lambda')
function_name = 'genius_lambda_function'

with open('genius_lambda.zip', 'rb') as f:
    zipped_code = f.read()

lambda_client.update_function_code(
    FunctionName=function_name,
    ZipFile=zipped_code
)

{'ResponseMetadata': {'RequestId': '2fcf5d00-21f7-4d2c-9070-fefa69a961e5',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Fri, 30 May 2025 03:15:41 GMT',
   'content-type': 'application/json',
   'content-length': '1337',
   'connection': 'keep-alive',
   'x-amzn-requestid': '2fcf5d00-21f7-4d2c-9070-fefa69a961e5'},
  'RetryAttempts': 0},
 'FunctionName': 'genius_lambda',
 'FunctionArn': 'arn:aws:lambda:us-east-2:943814146063:function:genius_lambda',
 'Runtime': 'python3.9',
 'Role': 'arn:aws:iam::943814146063:role/GeniusLambdaRole',
 'Handler': 'lambda_function.lambda_handler',
 'CodeSize': 1809631,
 'Description': '',
 'Timeout': 10,
 'MemorySize': 128,
 'LastModified': '2025-05-30T03:15:41.000+0000',
 'CodeSha256': 'v7SM/JJy2Nk5ECju5NCQRA7rDj0l6rcLC25V0LIMeZg=',
 'Version': '$LATEST',
 'TracingConfig': {'Mode': 'PassThrough'},
 'RevisionId': 'a05cf0d4-a04c-42fb-bca7-16052b07b4cb',
 'State': 'Active',
 'LastUpdateStatus': 'InProgress',
 'LastUpdateStatusReason': 'The function is 

In [16]:
import boto3

lambda_client = boto3.client('lambda')
function_name = 'genius_lambda_function' 

response = lambda_client.update_function_configuration(
    FunctionName=function_name,
    Environment={
        'Variables': {
            'GENIUS_API_TOKEN': '0MSSVdpMbNPuX4u5LhP0sFUd5sajyinz28KOWgbs-F9CfrWE4-M_14kZVJckK3GP'
        }
    }
)

print("Environment variable updated!")


Environment variable updated!


### Sanity Check

In [20]:
import boto3
import json

sqs = boto3.client('sqs')
queue_url = 'hhttps://sqs.us-east-2.amazonaws.com/943814146063/genius-sqs-queue'

message_body = {
    "track_id": "123",
    "title": "Welcome to New York",
    "artist": "Taylor Swift"
}

response = sqs.send_message(
    QueueUrl=queue_url,
    MessageBody=json.dumps(message_body)
)

print("Message sent to SQS:", response['MessageId'])




Message sent to SQS: aa5fcd1b-f237-4f93-ae25-f2daa8a5524f


In [21]:
import boto3

s3 = boto3.client('s3')
bucket_name = 'genius-s3-bucket-sowder'

response = s3.list_objects_v2(Bucket='genius-s3-bucket-sowder')
for obj in response.get('Contents', []):
    print(obj['Key'])

This confirms my entire pipeline is working end-to-end:
- Lambda is being triggered by SQS
- Lyrics are being scraped and stored in S3

# Use Dask to clean and match data

### 1. Set up AWS clients

In [70]:
import boto3
import json

region = 'us-east-2'
sqs_queue_url = 'https://sqs.us-east-2.amazonaws.com/943814146063/genius-sqs-queue'
s3_bucket = 'genius-s3-bucket-sowder'
dynamodb_table = 'genius-ddb-table'

s3 = boto3.client('s3', region_name=region)
sqs = boto3.client('sqs', region_name=region)
dynamodb = boto3.resource('dynamodb', region_name=region)
lambda_client = boto3.client('lambda', region_name=region)

In [78]:
import dask.dataframe as dd

df = dd.read_json(
    "s3://genius-s3-bucket-sowder/lyrics/*.json",
    storage_options={
        'client_kwargs': {'region_name': 'us-east-2'}
    }
)
df.head()



Unnamed: 0,track_id,title,artist,url,lyrics
0,123,Welcome to New York,Taylor Swift,https://genius.com/Taylor-swift-welcome-to-new...,Lyrics not found.


### 2. Batch send messages from CSV using Dask

In [63]:
import dask.dataframe as dd

csv_df = dd.read_csv('final_joined_table.csv')
csv_df = csv_df[['track_id', 'title', 'artist']].dropna().drop_duplicates(subset=['title', 'artist'])

In [68]:
import boto3
import json

sqs = boto3.client('sqs')
queue_url = 'https://sqs.us-east-2.amazonaws.com/943814146063/genius-sqs-queue'

for i, row in enumerate(csv_df.head(6000).itertuples(index=False)):
    message = {
        "track_id": getattr(row, "track_id", ""),
        "title": getattr(row, "title", ""),
        "artist": getattr(row, "artist", "")
    }
    response = sqs.send_message(QueueUrl=queue_url, MessageBody=json.dumps(message))
    print(f"{i+1}: Sent '{message['title']}' by {message['artist']} — MsgID: {response['MessageId']}")

1: Sent 'Collapsing New People' by Fad Gadget — MsgID: a9427228-ca88-499b-8ef2-3c910e27cc33
2: Sent 'We Rule The Fucking Land' by Zimmers Hole — MsgID: a33eb62b-6c27-40cf-8c76-f42e218dd538
3: Sent 'Blood' by Candiria — MsgID: b8ebd713-8434-413b-966e-74b50473adc0
4: Sent 'Deathly' by Aimee Mann — MsgID: fa6de2e7-ed42-45b6-89e0-d8e35d5a1374
5: Sent 'My Definition Of A Boombastic Jazz Style' by Dream Warriors — MsgID: e000c75a-1c17-486e-adb0-e119ea3dac35
6: Sent 'Take A Load Off' by Bubba Sparxxx — MsgID: 97a1a26d-8968-40c5-a067-27e2ebb365f4
7: Sent 'Rapper's Delight' by The Sugarhill Gang — MsgID: f6b250cc-c8d0-429a-912e-b0e64e02ca5d
8: Sent 'So I Thought' by Flyleaf — MsgID: cc9938f2-4ead-4e96-adfb-bb30d63991b3
9: Sent 'Twentytwofourteen' by The Album Leaf — MsgID: ed3edcba-1387-41e5-a92b-88df3e35364e
10: Sent '21st Century Boy' by Willy Mason — MsgID: ad6daa96-16ac-4095-a3d9-2bf8805d1c16
11: Sent 'Edge Hill' by Groove Armada — MsgID: c6780ea9-f219-4d15-9519-5e0f0fe32553
12: Sent 'When 

### 3. List objects in S3

In [69]:
import boto3

s3 = boto3.client('s3')
bucket_name = 'genius-s3-bucket-sowder'

response = s3.list_objects_v2(Bucket='genius-s3-bucket-sowder')
for obj in response.get('Contents', []):
    print(obj['Key'])

lyrics/123.json
lyrics/TRAAUFV128EF35F02C.json
lyrics/TRABAFJ128F42AF24E.json
lyrics/TRABOGK128F9348805.json
lyrics/TRABOPN128F9326C56.json
lyrics/TRABZQR12903CCBE06.json
lyrics/TRACLWY128F145F848.json
lyrics/TRADFII128F931FC9F.json
lyrics/TRADIFK128F92D35D3.json
lyrics/TRAFAEO128F92E9A19.json
lyrics/TRAGFVT128F92ECB5D.json
lyrics/TRAGSSJ128F92FA76C.json
lyrics/TRAGZXB128F4288D88.json
lyrics/TRAIBXJ128F4260700.json
lyrics/TRAIPST128F9302AE9.json
lyrics/TRAKEYJ128F429B2C8.json
lyrics/TRALASU128E07899DB.json
lyrics/TRAMDRT12903CA121C.json
lyrics/TRAMIHQ128F4262218.json
lyrics/TRAMTMJ128E0782829.json
lyrics/TRAMUZE128C7196B34.json
lyrics/TRAMYXD128F42781AF.json
lyrics/TRANSAW128F422CA49.json
lyrics/TRANVXK128F4239375.json
lyrics/TRAPHYP128F9341FE1.json
lyrics/TRAPVUT128F4267198.json
lyrics/TRAQJMP128F92E3821.json
lyrics/TRAQZJA128F1484D43.json
lyrics/TRARDUF128F423EF31.json
lyrics/TRARHCB128F148D1B0.json
lyrics/TRASNQE128F933323D.json
lyrics/TRASRGN12903CFF756.json
lyrics/TRASUWA128F933E6

### 4. Load S3 lyrics into Dask

In [71]:
import dask.dataframe as dd

df = dd.read_json(
    "s3://genius-s3-bucket-sowder/lyrics/*.json",
    storage_options={
        'client_kwargs': {'region_name': 'us-east-2'}
    }
)
df.head()

Unnamed: 0,track_id,title,artist,url,lyrics
0,123,Welcome to New York,Taylor Swift,https://genius.com/Taylor-swift-welcome-to-new...,Lyrics not found.


In [44]:
from dask import delayed
import fsspec
import json
import dask.bag as db

storage_options = {
    "client_kwargs": {
        "region_name": "us-east-2"
    }
}

fs = fsspec.filesystem("s3", **storage_options)
files = fs.glob("genius-s3-bucket-sowder/lyrics/*.json")

@delayed
def load_json(path):
    with fs.open(path) as f:
        try:
            return json.load(f)
        except Exception as e:
            print(f"Error reading {path}: {e}")
            return None

data = [load_json(f) for f in files]
bag = db.from_delayed(data)
bag = bag.filter(lambda x: x is not None and isinstance(x, dict) and "track_id" in x)

meta = {
    "track_id": str,
    "title": str,
    "artist": str,
    "url": str,
    "lyrics": str
}

df = bag.to_dataframe(meta=meta)
df.compute().head(10)


KeyError: 0

In [32]:
lyrics_df.compute().head(10)

Unnamed: 0,track_id,title,artist,url,lyrics
0,123,Welcome to New York,Taylor Swift,https://genius.com/Taylor-swift-welcome-to-new...,Lyrics not found.


### 5. Clean lyrics

In [26]:
import re
from nltk.corpus import stopwords

stop_words = set(stopwords.words("english"))

def clean_lyrics_partition(pdf):
    def clean(text):
        if not isinstance(text, str):
            return ""
        text = re.sub(r"[^\w\s]", "", text.lower())
        tokens = text.split()
        return " ".join([word for word in tokens if word not in stop_words])

    pdf["clean_lyrics"] = pdf["lyrics"].apply(clean)
    return pdf

meta = lyrics_df.head(1).assign(clean_lyrics="")
df_cleaned = lyrics_df.map_partitions(clean_lyrics_partition, meta=meta)

In [29]:
df_cleaned.head()


Unnamed: 0,track_id,title,artist,url,lyrics,clean_lyrics
0,123,Welcome to New York,Taylor Swift,https://genius.com/Taylor-swift-welcome-to-new...,Lyrics not found.,lyrics found


### 6. Join with Spotify CSV

In [27]:
csv_df = dd.read_csv("final_joined_table.csv")
merged = df_cleaned.merge(csv_df, on=["title", "artist"], how="inner")
merged = merged.rename(columns={"track_id_y": "track_id"}).drop(columns=["track_id_x"])

In [28]:
# Preview
print(merged.columns)
print(merged.head())


Index(['title', 'artist', 'url', 'lyrics', 'clean_lyrics', 'track_id',
       'spotify_id', 'emotion', 'id', 'acousticness', 'analysis_url',
       'danceability', 'duration', 'energy', 'instrumentalness', 'key',
       'liveness', 'loudness', 'mode', 'speechiness', 'tempo',
       'time_signature', 'valence'],
      dtype='object')
Empty DataFrame
Columns: [title, artist, url, lyrics, clean_lyrics, track_id, spotify_id, emotion, id, acousticness, analysis_url, danceability, duration, energy, instrumentalness, key, liveness, loudness, mode, speechiness, tempo, time_signature, valence]
Index: []

[0 rows x 23 columns]


In [7]:
df.shape[0].compute()

2831

In [40]:
import dask.dataframe as dd

df = dd.read_json(
    "s3://genius-s3-bucket-sowder/lyrics/*.json",
    storage_options={"anon": False},
    dtype={"track_id": "object", "title": "object", "artist": "object", "url": "object", "lyrics": "object"}
)
df.head()



Unnamed: 0,track_id,title,artist,url,lyrics
0,123,Welcome to New York,Taylor Swift,https://genius.com/Taylor-swift-welcome-to-new...,Lyrics not found.


In [1]:
import dask.dataframe as dd
import pandas as pd
import re
from nltk.corpus import stopwords

# Load stopwords once
stop_words = set(stopwords.words("english"))

# Reload the data cleanly (assumes you've already confirmed this loads correctly)
df = dd.read_json(
    "s3://genius-bucket-654654514107/lyrics/*.json",
    storage_options={"anon": False},
    dtype={"track_id": "object", "title": "object", "artist": "object", "url": "object", "lyrics": "object"}
)

# Define a safe cleaning function
def clean_lyrics_partition(pdf):
    def clean(text):
        if not isinstance(text, str):
            return ""
        text = re.sub(r"[^\w\s]", "", text.lower())
        tokens = text.split()
        return " ".join([word for word in tokens if word not in stop_words])
    
    pdf["clean_lyrics"] = pdf["lyrics"].apply(clean)
    return pdf

# Apply map_partitions with correct meta
meta = df.head(1).assign(clean_lyrics="")  # Use actual meta schema
df_cleaned = df.map_partitions(clean_lyrics_partition, meta=meta)

# Safely inspect first few rows
result = df_cleaned.get_partition(0).compute()
print(result[["title", "artist", "clean_lyrics"]].head())


                 title        artist  clean_lyrics
0  Welcome to New York  Taylor Swift  lyrics found
