<a href="https://colab.research.google.com/github/slazur83/Tableau/blob/main/MusicTracks_exporter.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import glob
import shutil
import logging
import time
import re
import csv
import datetime
from datetime import datetime
from google.colab import drive

import pandas as pd
import json
import requests
from tabulate import tabulate

In [None]:
drive_path = '/content/drive/MyDrive'

if not os.path.exists(drive_path):
    drive.mount('/content/drive')

config_file_path = os.path.join(drive_path, "Skrypty", "config.json")

try:
    with open(config_file_path, 'r') as config_file:
        config = json.load(config_file)
        print("Configuration file loaded successfully.")
except FileNotFoundError:
    print(f"Error: Configuration file not found at {config_file_path}.")
except json.JSONDecodeError:
    print("Error: Failed to parse the configuration file. Please check the file's content.")

In [None]:
SPECIAL_CHARACTER_MAPPING = {
    "Chylinska": "Chylińska",
    "Toure": "Touré",
    "Ashford Simpson": "Ashford & Simpson",
    "Anderson Paak": "Anderson .Paak",
    "Anderson .paak": "Anderson .Paak",
    "Axwell Ingrosso": "Axwell /\ Ingrosso",
    "Zodiac Mindwarp The Love Reaction": "Zodiac Mindwarp & The Love Reaction",
    "Durand Jones The Indications" : "Durand Jones & The Indications"
}

def standardize_artist_and_track(artist, track):

    if not pd.notna(artist) or not pd.notna(track):
        return artist, track

    if artist:
        artist = ' '.join(artist.split())
        artist = artist.replace('St ', 'St. ').replace(' And ', ' & ').replace(' and ', ' & ').replace(' AND ', ' & ')
        artist = re.sub(r'(?<=[a-z])(?=[A-Z])', ' ', artist)

        for original, replacement in SPECIAL_CHARACTER_MAPPING.items():
            artist = artist.replace(original, replacement)

        if artist.lower().endswith(' vevo'):
            artist = artist[:-5].strip()

        words = artist.split()
        artist = ' '.join(word.capitalize() if word.lower() != '&' else '&' for word in words)

    if track:
        track = ' '.join(track.strip().title().split())

    feat_match = re.search(r'\(feat\. (.*?)\)', track, re.IGNORECASE)
    if feat_match:
        featured_artist = feat_match.group(1).strip()
        if featured_artist.lower() not in artist.lower():
            artist = f"{artist}, {featured_artist}"
        track = re.sub(r'\(feat\. .*?\)', '', track, flags=re.IGNORECASE).strip()

    return artist, track


def remove_duplicates(df, subset_columns):

    before_count = df.shape[0]
    print(f"Before removing duplicates: {before_count} rows")

    df_cleaned = df.drop_duplicates(subset=subset_columns)

    after_count = df_cleaned.shape[0]
    print(f"After removing duplicates: {after_count} rows")

    removed_count = before_count - after_count
    print(f"Number of duplicates removed: {removed_count} rows")

    return df_cleaned


def filter_by_duration(df):

    if 'Duration' in df.columns:

        duration_numeric = pd.to_numeric(df['Duration'], errors='coerce')
        duration_numeric = duration_numeric.apply(lambda x: x / 1000 if x >= 1000 else x)
        mask = (duration_numeric >= 30) | duration_numeric.isna()
        filtered_df = df[mask]
        removed_rows = len(df) - len(filtered_df)
        print(f"Number of rows filtered out by duration: {removed_rows}")
        return filtered_df

    else:
        print("The 'Duration' column is not present in the DataFrame.")
        return df



def seconds_to_hms(seconds):
    minutes = int(seconds // 60)
    seconds = int(round(seconds % 60))
    return f"{minutes}m {seconds:02}s"

def duration_stats(df, column_name='Duration'):

    if column_name in df.columns:
        df[column_name] = pd.to_numeric(df[column_name], errors='coerce')
        df[column_name] = df[column_name].apply(lambda x: x / 1000 if x >= 1000 else x)
        df_filtered = df[df[column_name].notna()]

        if not df_filtered.empty:
            min_duration = df_filtered[column_name].min()
            max_duration = df_filtered[column_name].max()
            mean_duration = df_filtered[column_name].mean()

            min_duration_hms = seconds_to_hms(min_duration)
            max_duration_hms = seconds_to_hms(max_duration)
            mean_duration_hms = seconds_to_hms(mean_duration)

            return {
                "Shortest track duration": min_duration_hms,
                "Longest track duration": max_duration_hms,
                "Average track duration": mean_duration_hms
            }
        else:
            print("No valid duration data available after filtering 'N/A' or invalid values.")
            return None
    return None

# **LastFM**

In [None]:
last_fm_api_key = config['last_fm_api_key']
user_name = 'slazur83'

base_url = "http://ws.audioscrobbler.com/2.0/"
params = {
    'method': 'user.getrecenttracks',
    'user': user_name,
    'api_key': last_fm_api_key,
    'format': 'json',
    'limit': 200,
    'page': 1
}

base_filename = "lastfm_tracks.csv"
directory = '.'
drive_folder = '/content/drive/MyDrive/Dane z aplikacji/LastFM/'

def format_date(date_str):
    if date_str == 'N/A':
        return 'N/A'

    try:
        return datetime.strptime(date_str, '%d %b %Y, %H:%M:%S').strftime('%Y-%m-%d %H:%M')
    except ValueError:
        try:
            return datetime.strptime(date_str, '%d %b %Y, %H:%M').strftime('%Y-%m-%d %H:%M')
        except ValueError:
            print(f"Date format error for: {date_str}")
            return 'Invalid Date Format'

def get_recent_tracks(params, max_retries=5):
    retries = 0
    while retries < max_retries:
        try:
            response = requests.get(base_url, params=params)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.HTTPError as http_err:
            if response.status_code >= 500:
                retries += 1
                wait_time = 2 ** retries
                print(f"Server error {response.status_code}. Retrying in {wait_time} seconds...")
                time.sleep(wait_time)
            else:
                print(f"HTTP error occurred: {http_err}")
                raise
        except requests.exceptions.RequestException as err:
            print(f"Error during requests: {err}")
            raise
    print(f"Failed after {max_retries} retries.")
    return None

csv_file = base_filename
with open(csv_file, mode='w', newline='', encoding='utf-8') as file:
    writer = csv.writer(file)
    writer.writerow(['artist', 'track', 'album', 'playback_date'])

    while True:
        data = get_recent_tracks(params)
        if not data:
            break

        tracks = data.get('recenttracks', {}).get('track', [])
        if not tracks:
            break

        for track in tracks:
            artist = track.get('artist', {}).get('#text', 'N/A')
            track_name = track.get('name', 'N/A')
            album = track.get('album', {}).get('#text', 'N/A')
            playback_date = format_date(track.get('date', {}).get('#text', 'N/A'))

            writer.writerow([artist, track_name, album, playback_date])

        params['page'] += 1

print(f"Data written to {csv_file}")

shutil.copy(csv_file, drive_folder)
print(f"The file has been saved in {drive_folder}.")

In [None]:
drive_folder = '/content/drive/MyDrive/Dane z aplikacji/LastFM/'
base_filename = "lastfm_tracks.csv"
csv_file = base_filename

def convert_to_datetime(date_str):
    try:
        return pd.to_datetime(date_str, format='%Y-%m-%d %H:%M', errors='coerce')
    except (ValueError, TypeError) as e:
        print(f"Error converting {date_str}: {e}")
        return pd.NaT

df_lastfm = pd.read_csv(drive_folder + csv_file, header=0)
df_lastfm.columns = ['Artist', 'Track', 'Album', 'Date']

df_lastfm['Date'] = df_lastfm['Date'].apply(convert_to_datetime)
df_lastfm['Date'] = df_lastfm['Date'].dt.strftime('%Y-%m-%d %H:%M')

df_lastfm['Source'] = 'LastFM'
df_lastfm['Account'] = 'slazur83'

df_lastfm[['Artist', 'Track']] = df_lastfm.apply(
    lambda row: standardize_artist_and_track(row['Artist'], row['Track']), axis=1, result_type="expand")

df_lastfm = remove_duplicates(df_lastfm, ['Date', 'Artist', 'Track'])

df_lastfm = filter_by_duration(df_lastfm)

df_lastfm = df_lastfm.copy().assign(**{'Sub-source': 'N/A'})

stats = duration_stats(df_lastfm)
if stats:
    print(stats)

# **YouTube Music**

In [None]:
folder_paths = {
    'riwanna85': '/content/drive/MyDrive/Dane z aplikacji/Google/riwanna85/YouTube i YouTube Music/historia/',
    'slazur83': '/content/drive/MyDrive/Dane z aplikacji/Google/slazur83/YouTube i YouTube Music/historia/'
}
file_name = 'historia oglądania.json'

def extract_artist(subtitles):
    return subtitles[0].get('name', '').split(' - ')[0] if isinstance(subtitles, list) and subtitles else ''

def extract_song_title(title):
    return title.replace("Obejrzano: ", "") if title.startswith("Obejrzano: ") else title

def load_and_process_data(folder_path, account_name):
    source_file = os.path.join(folder_path, file_name)
    if not os.path.isfile(source_file):
        print(f"Brak pliku historii: {source_file}")
        return pd.DataFrame(columns=['Artist', 'Track', 'Date', 'Duration', 'Source', 'Account'])

    df = pd.read_json(source_file, encoding='utf-8')
    df = df[df['header'] == 'YouTube Music'].copy()
    df['Artist'] = df['subtitles'].apply(extract_artist)
    df['Track'] = df['title'].apply(extract_song_title)
    df['Date'] = pd.to_datetime(df['time'], format='ISO8601').dt.strftime('%Y-%m-%d %H:%M')
    df['Source'], df['Account'], df['Duration'] = 'YouTube Music', account_name, 'N/A'
    return df[['Artist', 'Track', 'Date', 'Duration', 'Source', 'Account']]

df_ytmusic = pd.concat(
    [load_and_process_data(path, account) for account, path in folder_paths.items()],
    ignore_index=True
)

df_ytmusic[['Artist', 'Track']] = df_ytmusic.apply(
    lambda row: standardize_artist_and_track(row['Artist'], row['Track']), axis=1, result_type="expand"
)

df_ytmusic = remove_duplicates(df_ytmusic, ['Date', 'Artist', 'Track'])

df_ytmusic = filter_by_duration(df_ytmusic)

df_ytmusic = df_ytmusic.copy().assign(**{'Sub-source': 'N/A'})

stats = duration_stats(df_ytmusic)
if stats:
    print(stats)

In [None]:
def summarize_account_data(account_data):
    return account_data.groupby('Account').agg(
        start_date=('Date', 'min'), end_date=('Date', 'max'), row_count=('Account', 'size')
    ).reset_index()

final_summary = summarize_account_data(df_ytmusic)

print("\nFinal Summary of Accounts")
print(tabulate(final_summary, headers='keys', tablefmt='pretty'))

overall_start_date, overall_end_date, overall_row_count = (
    df_ytmusic['Date'].min(), df_ytmusic['Date'].max(), df_ytmusic.shape[0]
) if not df_ytmusic.empty else (None, None, 0)

print(f"\nOverall Summary:\nDate range: from {overall_start_date} to {overall_end_date}\nTotal Row count: {overall_row_count}\n")

# **Spotify**

In [None]:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def load_spotify_data(base_path, account_name):
    mydata_folders = glob.glob(os.path.join(base_path, "MyData*"))
    df_list = []

    for folder in mydata_folders:
        takeout_name = os.path.basename(folder)

        all_files = glob.glob(os.path.join(folder, "StreamingHistory*.json")) + \
                    glob.glob(os.path.join(folder, "Streaming_History_Audio_*.json")) + \
                    glob.glob(os.path.join(folder, "Streaming_History_Video_*.json"))

        music_files = {file for file in all_files if 'podcast' not in os.path.basename(file).lower() and 'video' not in os.path.basename(file).lower()}

        logging.info(f"\nFiles found in {takeout_name} for account {account_name}:")
        for file in music_files:
            logging.info(f" - {file}")

            try:
                with open(file, 'r', encoding='utf-8') as f:
                    if f.read().strip():
                        df = pd.read_json(file)

                        if 'endTime' in df.columns:
                            df = df.rename(columns={'endTime': 'Date', 'artistName': 'Artist', 'trackName': 'Track', 'msPlayed': 'Duration'})
                        elif 'ts' in df.columns and 'master_metadata_track_name' in df.columns:
                            df = df.rename(columns={'ts': 'Date', 'master_metadata_album_artist_name': 'Artist', 'master_metadata_track_name': 'Track', 'ms_played': 'Duration'})
                            df['Date'] = pd.to_datetime(df['Date'], errors='coerce', utc=True)
                            df['Date'] = df['Date'].dt.floor('min')

                        df['Account'] = account_name
                        df['Takeout'] = takeout_name
                        df_list.append(df[['Artist', 'Track', 'Date', 'Duration', 'Account', 'Takeout']])
                    else:
                        logging.warning(f"Skipping empty file: {file}")
            except ValueError as e:
                logging.error(f"Error reading {file}: {e} - Skipping this file.")

    if df_list:
        combined_df = pd.concat(df_list, ignore_index=True)
    else:
        combined_df = pd.DataFrame()

    return combined_df

df1 = load_spotify_data('/content/drive/MyDrive/Dane z aplikacji/Spotify/slazur83@gmail.com/', 'slazur83')
df2 = load_spotify_data('/content/drive/MyDrive/Dane z aplikacji/Spotify/zethar182@gmail.com/', 'zethar182')

df_spotify = pd.concat([df1, df2], ignore_index=True)
df_spotify['Source'] = 'Spotify'

df_spotify['Date'] = pd.to_datetime(df_spotify['Date'], errors='coerce', utc=True)
df_spotify = df_spotify.dropna(subset=['Date'])

df_spotify['Date'] = df_spotify['Date'].dt.strftime('%Y-%m-%d %H:%M')

df_spotify[['Artist', 'Track']] = df_spotify.apply(
    lambda row: standardize_artist_and_track(row['Artist'], row['Track']), axis=1, result_type="expand"
)

df_spotify = remove_duplicates(df_spotify, ['Date', 'Artist', 'Track'])

df_spotify = filter_by_duration(df_spotify)

df_spotify = df_spotify.copy().assign(**{'Sub-source': 'N/A'})

stats = duration_stats(df_spotify)
if stats:
    print(stats)

In [None]:
def summarize_account_data(account_data):
    takeout_summary = account_data.groupby('Takeout').agg(
        start_date=('Date', 'min'),
        end_date=('Date', 'max'),
        row_count=('Takeout', 'count')
    ).reset_index()

    takeout_summary['Account'] = account_data['Account'].iloc[0]
    return takeout_summary[['Account', 'Takeout', 'start_date', 'end_date', 'row_count']]

final_summary = pd.concat([
    summarize_account_data(df_spotify[df_spotify['Account'] == 'zethar182']),
    summarize_account_data(df_spotify[df_spotify['Account'] == 'slazur83'])
], ignore_index=True)

print("\nFinal Summary of Accounts")
print(tabulate(final_summary, headers='keys', tablefmt='pretty'))

overall_start_date = df_spotify['Date'].min() if not df_spotify.empty else None
overall_end_date = df_spotify['Date'].max() if not df_spotify.empty else None
overall_row_count = df_spotify.shape[0] if not df_spotify.empty else 0

print(f"\nOverall Summary:\nDate range: from {overall_start_date} to {overall_end_date}\nTotal Row count: {overall_row_count}\n")

# **Deezer**

In [None]:
deezer_file = '/content/drive/MyDrive/Dane z aplikacji/Deezer/4519420622.xlsx'

df_deezer = pd.read_excel(deezer_file, sheet_name="10_listeningHistory")

df_deezer = df_deezer.rename(columns={
    'Song Title': 'Track',
    'Album Title': 'Album',
    'Listening Time': 'Duration',
    'Platform Name': 'Platform'
})

df_deezer = df_deezer.drop(columns=['ISRC'], errors='ignore')

df_deezer['Source'] = 'Deezer'
df_deezer['Account'] = 'slazur83'

df_deezer['Date'] = pd.to_datetime(df_deezer['Date'], errors='coerce')
df_deezer['Date'] = df_deezer['Date'].dt.strftime('%Y-%m-%d %H:%M')

df_deezer[['Artist', 'Track']] = df_deezer.apply(
    lambda row: standardize_artist_and_track(row['Artist'], row['Track']), axis=1, result_type="expand"
)

df_deezer = df_deezer.dropna(subset=['Date'])

df_deezer = remove_duplicates(df_deezer, ['Date', 'Artist', 'Track'])

df_deezer = filter_by_duration(df_deezer)

df_deezer = df_deezer.copy().assign(**{'Sub-source': 'N/A'})

stats = duration_stats(df_deezer)
if stats:
    print(stats)

**Final DataFrame**

In [None]:
merged = pd.concat([df_spotify, df_ytmusic, df_lastfm, df_deezer], ignore_index=True)
merged['Date'] = pd.to_datetime(merged['Date'])

nulls = merged['Date'].isna().sum()
if nulls > 0:
    if nulls < 15:
        merged.dropna(subset=['Date'], inplace=True)
        print(f'Dropped {nulls} rows with missing Date values.')
    else:
        raise ValueError(f'There are {nulls} null values in the Date column that need attention.')
else:
    print('No null values found in the Date column.\n')

columns_order = ['Date', 'Artist', 'Track', 'Album', 'Duration', 'Source', 'Sub-source', 'Account', 'Platform', 'Platform Model', 'IP Address']
merged = merged.reindex(columns=columns_order).copy()
merged['Date'] = pd.to_datetime(merged['Date'])
merged = merged.sort_values(by='Date', inplace=False)

all_merged = merged.copy()
all_merged["Source"] = 'All Merged'
all_merged['Sub-source'] = merged['Source']
all_merged_no_duplicates = all_merged.drop_duplicates(subset=['Date', 'Artist', 'Track'], keep='first')
final_df = pd.concat([merged, all_merged_no_duplicates], ignore_index=True)

In [None]:
from pandas import Timedelta

lastfm_df = final_df[final_df['Sub-source'] == 'LastFM'].copy()
other_df = final_df[(final_df['Sub-source'] != 'LastFM') & (final_df['Source'] == 'All Merged')].copy()

for minutes in range(1, 10):

    lastfm_temp_df = lastfm_df.copy()
    lastfm_temp_df['Date'] = lastfm_temp_df['Date'] + Timedelta(minutes=minutes)
    temp_combined_df = pd.concat([other_df, lastfm_temp_df])
    temp_combined_df = temp_combined_df.drop_duplicates(subset=['Date', 'Artist', 'Track'], keep=False)
    lastfm_df = lastfm_df[lastfm_df.index.isin(temp_combined_df.index)]

checked_df = pd.concat([other_df, lastfm_df]).drop_duplicates(subset=['Date', 'Artist', 'Track'])
final_df = pd.concat([merged, checked_df], ignore_index=True)

In [None]:
for source in final_df['Source'].unique():
    source_data = final_df[final_df['Source'] == source]

    total_rows = source_data.shape[0]
    start_date = source_data['Date'].min().strftime('%Y-%m-%d %H:%M') if total_rows > 0 else "N/A"
    end_date = source_data['Date'].max().strftime('%Y-%m-%d %H:%M') if total_rows > 0 else "N/A"
    distinct_artists = source_data['Artist'].nunique()
    distinct_tracks = source_data['Track'].nunique()

    print(f"Source: {source}")
    print(f"  Total rows: {total_rows}")
    print(f"  Start date: {start_date}")
    print(f"  End date: {end_date}")
    print(f"  Distinct artists: {distinct_artists}")
    print(f"  Distinct tracks: {distinct_tracks}\n")

In [None]:
output_path1 = '/content/drive/MyDrive/Skrypty/Tableau/Outputs/music_tracks.csv'
output_path2 = '/content/drive/MyDrive/Skrypty/Tableau/Outputs/music_tracks.xlsx'

final_df.to_csv(output_path1, index=False)
print(f'Data successfully exported to {output_path1}')

final_df.to_excel(output_path2, index=False)
print(f'Data successfully exported to {output_path2}')

**Matching entries in LastFM**

In [None]:
merged_data = final_df.copy()

merged_data['Date'] = pd.to_datetime(merged_data['Date'])
merged_data['Duration'] = pd.to_numeric(merged_data['Duration'], errors='coerce')

filtered_sources = merged_data.query("Duration.isna() or Duration >= 30")

lastfm_data = merged_data.query("Source == 'LastFM'").copy()
lastfm_data['key'] = lastfm_data[['Artist', 'Track']].apply(tuple, axis=1)

def match_within_period(row, lastfm_data):
    time_window = pd.Timedelta(minutes=5)
    matching_rows = lastfm_data[
        (lastfm_data['key'] == row['key']) &
        (abs(lastfm_data['Date'] - row['Date']) <= time_window)
    ]
    return not matching_rows.empty

report = []

for source in merged_data['Source'].unique():
    if source in ('LastFM', 'All Merged'):
        continue

    source_data = merged_data.query("Source == @source")

    start_date = lastfm_data['Date'].min()
    end_date = source_data['Date'].max()

    filtered_source_data = source_data.query("@start_date <= Date <= @end_date").copy()
    filtered_source_data['key'] = filtered_source_data[['Artist', 'Track']].apply(tuple, axis=1)

    filtered_source_data['Exists_in_LastFM'] = filtered_source_data.apply(
        match_within_period, lastfm_data=lastfm_data, axis=1
    )

    matching_count = filtered_source_data['Exists_in_LastFM'].sum()
    total_source_count = len(filtered_source_data)
    matching_percentage = (matching_count / total_source_count * 100) if total_source_count > 0 else 0

    report.append({
        'Source': source,
        'Start Date': start_date.strftime("%Y-%m-%d"),
        'End Date': end_date.strftime("%Y-%m-%d"),
        'Total Entries': total_source_count,
        'Matching Entries': matching_count,
        'Matching Percentage': f"{matching_percentage:.2f}%"
    })

for entry in report:
    print(f"Source: {entry['Source']}")
    print(f"  Start Date: {entry['Start Date']}")
    print(f"  End Date: {entry['End Date']}")
    print(f"  Total Entries: {entry['Total Entries']}")
    print(f"  Matching Entries: {entry['Matching Entries']}")
    print(f"  Percentage of Matching Entries in LastFM: {entry['Matching Percentage']}\n")


In [None]:
filtered_df = final_df[(final_df['Source'] == 'All Merged') &
                       (final_df['Date'].dt.month == 9) &
                       (final_df['Date'].dt.year == 2024) &
                       (final_df['Date'].dt.day == 14)]

filtered_df_sorted = filtered_df.sort_values(by='Date')
display(filtered_df)


In [None]:
unique_artists = final_df["Artist"].dropna().unique()
unique_artists = [str(artist) for artist in unique_artists]

unique_tracks = final_df["Track"].dropna().unique()
unique_tracks = [str(track) for track in unique_tracks]

sorted_artists = sorted(unique_artists)
sorted_tracks = sorted(unique_tracks)

artist_track_df = final_df[["Artist", "Track"]].dropna().drop_duplicates()
artist_track_df = artist_track_df.sort_values(by=["Artist", "Track"])

artist_track_counts = final_df.groupby(["Artist", "Track"]).size().reset_index(name="Count")
artist_track_counts = artist_track_counts.sort_values(by="Count", ascending=False)

In [None]:
for artist in sorted_artists:
  print(artist)