Downloading the files

In [None]:
from google.cloud import storage
import pandas as pd
from io import BytesIO
import pandas as pd
import os
import hashlib
from dotenv import load_dotenv
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials

storage_client = storage.Client()

bucket_name = 'yt-charts-raw'

bucket = storage_client.bucket(bucket_name)

df_list = []

# List all blobs in the bucket
blobs = bucket.list_blobs()

# Loop through each blob (file)
for blob in blobs:
    if blob.name.endswith('.csv'):
        # Download blob data into memory and create DataFrame
        byte_stream = BytesIO(blob.download_as_bytes())
        df = pd.read_csv(byte_stream)
        
        # Add DataFrame to list
        df_list.append(df)

# Concatenate all DataFrames in the list into one DataFrame
weekly_charts = pd.concat(df_list)
weekly_charts.reset_index(drop=True, inplace=True)

Published dates and tags

In [None]:
import os
from googleapiclient.discovery import build
import pandas as pd

youtube = build('youtube', 'v3', developerKey=os.environ['YOUTUBE_API_KEY'])

weekly_charts['track_video_id_youtube'] = weekly_charts['YouTube URL'].apply(lambda x: x.split("v=")[1])


published_dates = []
tags_data = []

for video_id in weekly_charts['track_video_id_youtube']:
    video_request = youtube.videos().list(
        part="snippet",
        id=video_id
    )
    video_response = video_request.execute()
    
    if not video_response.get('items'):
        print(f"No data returned for video ID: {video_id}")
        published_dates.append(None)
        continue
    
    try:
        published_date = video_response['items'][0]['snippet']['publishedAt']
        published_dates.append(published_date)
    except KeyError:
        print(f"Could not retrieve published date for video ID: {video_id}")
        published_dates.append(None)

    try:
        tags = video_response['items'][0]['snippet'].get('tags', [])[:3]
        for tag in tags:
            tags_data.append({'track_video_id_youtube': video_id, 'tags': tag})
    except KeyError:
        print(f"Could not retrieve tags for video ID: {video_id}")

tags_df = pd.DataFrame(tags_data)

weekly_charts['yt_published_dates'] = published_dates


Api calls and creating dims

In [None]:


# Load environment variables
load_dotenv()

# Initialize Spotipy
sp = spotipy.Spotify(auth_manager=SpotifyClientCredentials(client_id=os.getenv("SPOTIPY_CLIENT_ID"),
                                                           client_secret=os.getenv("SPOTIPY_CLIENT_SECRET")))

# Generate ID function
def generate_id(value):
    return hashlib.md5(str(value).encode()).hexdigest()

def get_spotify_id(track):
    try:
        if len(track) > 100:  # Truncate query to first 100 characters
            track = track[:100]
        result = sp.search(q=track, type='track', limit=1)
        return result['tracks']['items'][0]['id'] if result['tracks']['items'] else None
    except Exception as e:
        print(f"An error occurred: {e}")
        return None

# Get Spotify artist IDs with error handling
def get_artist_id(artist):
    try:
        result = sp.search(q=artist, type='artist', limit=1)
        return result['artists']['items'][0]['id'] if result['artists']['items'] else None
    except Exception as e:
        print(f"An error occurred: {e}")
        return None

# Get genres with error handling
def get_genre(spotify_id):
    try:
        if spotify_id is None:
            return None
        result = sp.artist(spotify_id)
        return result['genres'][0] if result['genres'] else None
    except Exception as e:
        print(f"An error occurred: {e}")
        return None

# Read your dataframe (already in your code as weekly_charts)

# Prepare fact_chart
fact_chart = weekly_charts[['Rank', 'Previous Rank', 'week_open', 'week_close', 'Weeks on Chart', 'Views', 'Weekly Growth']].copy()
fact_chart['fact_id'] = fact_chart.apply(lambda x: generate_id(tuple(x)), axis=1)
fact_chart['dim_track_id'] = weekly_charts.apply(lambda x: generate_id((x['Track Name'], x['track_video_id_youtube'])), axis=1)
fact_chart['dim_artist_id'] = weekly_charts.apply(lambda x: generate_id((x['Artist Names'], x['main_artist'])), axis=1)


# Prepare dim_track
dim_track = weekly_charts[['Track Name', 'track_video_id_youtube']].drop_duplicates().copy()
dim_track['dim_track_id'] = dim_track.apply(lambda x: generate_id(tuple(x)), axis=1)

# Get Spotify track IDs
dim_track['spotify_track_id'] = dim_track['Track Name'].apply(lambda x: get_spotify_id(x) if pd.notna(x) else None)

# Prepare dim_artist
dim_artist = weekly_charts[['Artist Names', 'main_artist']].drop_duplicates().copy()
dim_artist['dim_artist_id'] = dim_artist.apply(lambda x: generate_id(tuple(x)), axis=1)

# Get Spotify artist IDs
dim_artist['artist_id_spotify'] = dim_artist['main_artist'].apply(lambda x: get_artist_id(x) if pd.notna(x) else None)

# Prepare dim_genre
dim_genre = pd.DataFrame()
dim_genre['dim_genre_id'] = dim_artist['dim_artist_id'].apply(generate_id)

# Get genres
dim_genre['genre_name'] = dim_artist['artist_id_spotify'].apply(lambda x: get_genre(x) if x is not None else None)
dim_genre['dim_artist_id'] = dim_artist['dim_artist_id']


Verifications and tests

In [None]:
dfs = [dim_genre, dim_artist, dim_track, fact_chart]

for i in dfs:
    print(len(i))

In [None]:
# Tests to check consistency

# Check if dim_track_id in fact_chart aligns with dim_track
assert all(fact_chart['dim_track_id'].isin(dim_track['dim_track_id']))

# Check if dim_artist_id in fact_chart aligns with dim_artist
assert all(fact_chart['dim_artist_id'].isin(dim_artist['dim_artist_id']))

# Check if dim_genre_id in dim_genre aligns with dim_artist
assert all(dim_genre['dim_artist_id'].isin(dim_artist['dim_artist_id']))

# Check if all fact_ids are unique
assert fact_chart['fact_id'].is_unique

print("All tests passed!")

Uploading

In [None]:
from google.cloud import storage
storage_client = storage.Client()
bucket = storage_client.bucket('yt-charts-trusted')

for file_name in os.listdir("data/trusted"):
    if file_name.endswith('.csv'):
        blob = bucket.blob(file_name)
        blob.upload_from_filename(os.path.join("data/trusted", file_name))

Check

In [None]:
from google.cloud import storage
from dotenv import load_dotenv
import os

load_dotenv()

storage_client = storage.Client()

bucket_name = 'yt-charts-trusted'

bucket = storage_client.bucket(bucket_name)

blobs = bucket.list_blobs()

print(f"Files in {bucket_name}:")

count = 0

for blob in blobs:
    print(blob.name)
    count +=1

print(f"{count} files")