# ETL Pipeline for music and fashion data

### Extract (E)
- Download Kaggle Datasets
- Store raw datasets in /datasets/raw/x_data

### Transform (T)
- Clean data, if necessary
- Standardize formats; dates, capitalization, classes
- Remove non-overlapping dates, only keep rows from both data sets where dates overlap

### Load (L)
- Store processed datasets in /datasets/process/x_data

In [1]:
import pandas as pd
from scripts.music_info import get_album_info
import numpy as np
import os

In [2]:
# Fashion data
customers_df = pd.read_csv('../datasets/raw/fashion_data/customers.csv')
discounts_df = pd.read_csv('../datasets/raw/fashion_data/discounts.csv')
products_df = pd.read_csv('../datasets/raw/fashion_data/products.csv')
transactions_df = pd.read_csv('../datasets/raw/fashion_data/transactions.csv')

# Music data
music_df = pd.read_csv('../datasets/raw/music_data/universal_top_spotify_songs.csv')

  customers_df = pd.read_csv('../datasets/raw/fashion_data/customers.csv')


## Fashion columns to keep

### Customers
- id
- city
- country
- gender
- age (use dob)
- job

In [3]:
country_code_mapping = {
    "United States": "US",  
    "中国": "CN",               
    "España": "ES",            
    "Deutschland": "DE",    
    "France": "FR",           
    "United Kingdom": "GB",    
    "Portugal": "PT",   
}

In [4]:
customers_df.drop(columns=['Name', 'Email', 'Telephone'], axis=1, inplace=True)
customers_df['Age'] = pd.to_datetime('today').year - pd.to_datetime(customers_df['Date Of Birth'], errors='coerce').dt.year
customers_df.drop(columns=['Date Of Birth'], axis=1, inplace=True)
customers_df.fillna({'Job Title': 'Unknown'}, inplace=True)
customers_df.head()

Unnamed: 0,Customer ID,City,Country,Gender,Job Title,Age
0,1,New York,United States,M,Unknown,22
1,2,New York,United States,M,Records manager,25
2,3,New York,United States,F,Unknown,22
3,4,New York,United States,M,Proofreader,29
4,5,New York,United States,F,Exercise physiologist,27


In [5]:
customers_df['Country'].value_counts()
customers_df['Country'] = customers_df['Country'].map(country_code_mapping)

In [6]:
customers_df.head()

Unnamed: 0,Customer ID,City,Country,Gender,Job Title,Age
0,1,New York,US,M,Unknown,22
1,2,New York,US,M,Records manager,25
2,3,New York,US,F,Unknown,22
3,4,New York,US,M,Proofreader,29
4,5,New York,US,F,Exercise physiologist,27


### Discounts

- Discount
- Category (has null)
- Sub Category (has null)
- Season (use start and end)

In [7]:
season_mapping = {
    1: 'Winter',
    2: 'Winter',
    3: 'Spring',
    4: 'Spring',
    5: 'Spring',
    6: 'Summer',
    7: 'Summer',
    8: 'Summer',
    9: 'Fall',
    10: 'Fall',
    11: 'Fall',
    12: 'Winter'
}

discounts_df.fillna({'Category': 'Unknown'}, inplace=True)
discounts_df.fillna({'Sub Category': 'Unknown'}, inplace=True)
discounts_df['Season'] = discounts_df['Start'].apply(lambda x: season_mapping[pd.to_datetime(x, errors='coerce').month] if pd.notnull(x) else 'Unknown')
discounts_df.drop(columns=['Start', 'End', 'Description'], axis=1, inplace=True)

In [8]:
discounts_df.head()

Unnamed: 0,Discont,Category,Sub Category,Season
0,0.4,Feminine,Coats and Blazers,Winter
1,0.4,Feminine,Sweaters and Knitwear,Winter
2,0.4,Masculine,Coats and Blazers,Winter
3,0.4,Masculine,Sweaters and Sweatshirts,Winter
4,0.4,Children,Coats,Winter


### Products

- Id
- Category
- Sub Category
- Description EN
- Color

In [9]:
products_df.drop(columns=['Description PT', 'Description DE', 'Description FR', 'Description ES', 'Description ZH', 'Sizes', 'Production Cost'], axis=1, inplace=True)
products_df.fillna({'Color': 'Unknown'}, inplace=True)

In [10]:
products_df.head()

Unnamed: 0,Product ID,Category,Sub Category,Description EN,Color
0,1,Feminine,Coats and Blazers,Sports Velvet Sports With Buttons,Unknown
1,2,Feminine,Sweaters and Knitwear,Luxurious Pink Denim With Buttons,PINK
2,3,Feminine,Dresses and Jumpsuits,Black Tricot Printed Tricot,BLACK
3,4,Feminine,Shirts and Blouses,Basic Cotton Blouse,Unknown
4,5,Feminine,T-shirts and Tops,Basic Cotton T-Shirt,Unknown


### Transactions

- Customer ID
- Product ID
- Size
- Color
- Unit Price
- Quantity
- Date
- Discount
- Store ID
- Currency
- Payment Method
- Invoice Total

In [11]:
transactions_df.drop(columns=['Invoice ID', 'Line', 'Line Total', 'Employee ID', 'Currency Symbol', 'SKU', 'Transaction Type'], axis=1, inplace=True)

In [12]:
transactions_df.head()

Unnamed: 0,Customer ID,Product ID,Size,Color,Unit Price,Quantity,Date,Discount,Store ID,Currency,Payment Method,Invoice Total
0,47162,485,M,,80.5,1,2023-01-01 15:42:00,0.0,1,USD,Cash,126.7
1,47162,2779,G,,31.5,1,2023-01-01 15:42:00,0.4,1,USD,Cash,126.7
2,47162,64,M,NEUTRAL,45.5,1,2023-01-01 15:42:00,0.4,1,USD,Cash,126.7
3,10142,131,M,BLUE,70.0,1,2023-01-01 20:04:00,0.4,1,USD,Cash,77.0
4,10142,716,L,WHITE,26.0,1,2023-01-01 20:04:00,0.0,1,USD,Cash,77.0


## Music Columns to keep

### Music

#### Columns
- ID
- name
- artists
- daily_rank
- daily_movement
- weekly_movement
- country
- snapshot_date
- popularity
- (all music data)

#### Only keep rows where countries align with customer dataset
No need for information on countires not included in fashion data

#### Segment dataset
Create 2 tables
**PK**: spotify_id

- Casual song information
['spotify_id', 'name', 'artists', 'daily_rank', 'daily_movement', 'weekly_movement', 'country', 'snapshot_date', 'popularity', 'is_explicit', 'album_name', 'album_release_date']
- Advanced song information
['spotify_id', 'duration_ms', 'danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness', 'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo', 'time_signature']

In [13]:
music_df.dropna(subset=['country'], inplace=True)

In [14]:
print(music_df['country'].unique())

['ZA' 'VN' 'VE' 'UY' 'US' 'UA' 'TW' 'TR' 'TH' 'SV' 'SK' 'SG' 'SE' 'SA'
 'RO' 'PY' 'PT' 'PL' 'PK' 'PH' 'PE' 'PA' 'NZ' 'NO' 'NL' 'NI' 'NG' 'MY'
 'MX' 'MA' 'LV' 'LU' 'LT' 'KZ' 'KR' 'JP' 'IT' 'IS' 'IN' 'IL' 'IE' 'ID'
 'HU' 'HN' 'HK' 'GT' 'GR' 'FR' 'FI' 'ES' 'EG' 'EE' 'EC' 'DO' 'DK' 'DE'
 'CZ' 'CR' 'CO' 'CL' 'CH' 'CA' 'BY' 'BR' 'BO' 'BG' 'BE' 'AU' 'AT' 'AR'
 'AE' 'GB']


In [15]:
print(customers_df['Country'].unique())

['US' 'CN' 'DE' 'GB' 'FR' 'ES' 'PT']


In [16]:
allowed_countries = set(customers_df['Country'])
music_df = music_df[music_df['country'].isin(allowed_countries)]
music_df.reset_index(drop=True, inplace=True)
print(music_df['country'].unique())

['US' 'PT' 'FR' 'ES' 'DE' 'GB']


In [17]:
music_df.head()

Unnamed: 0,spotify_id,name,artists,daily_rank,daily_movement,weekly_movement,country,snapshot_date,popularity,is_explicit,...,key,loudness,mode,speechiness,acousticness,instrumentalness,liveness,valence,tempo,time_signature
0,42UBPzRMh5yyz0EDPr6fr1,Manchild,Sabrina Carpenter,1,0,49,US,2025-06-11,89,True,...,7,-5.087,1,0.0572,0.122,0.0,0.317,0.811,123.01,4
1,04emojnbYkrRmv5qtJcgVP,What I Want (feat. Tate McRae),"Morgan Wallen, Tate McRae",2,0,-1,US,2025-06-11,92,False,...,9,-3.92,1,0.0262,0.639,0.0,0.148,0.495,115.998,4
2,2RkZ5LkEzeHGRsmDqKwmaJ,Ordinary,Alex Warren,3,1,-1,US,2025-06-11,95,False,...,2,-6.141,1,0.06,0.704,7e-06,0.055,0.391,168.115,3
3,0FTmksd2dxiE5e3rWyJXs6,back to friends,sombr,4,-1,0,US,2025-06-11,98,False,...,1,-2.291,1,0.0301,9.4e-05,8.8e-05,0.0929,0.235,92.855,4
4,4gfrYDtaRmp6HPvN80V2ob,I Got Better,Morgan Wallen,5,0,0,US,2025-06-11,88,False,...,0,-4.587,1,0.0295,0.576,0.000564,0.155,0.542,84.986,4


In [18]:
music_info_df = music_df[['spotify_id', 'name', 'artists', 'daily_rank', 'daily_movement', 'weekly_movement', 'country', 'snapshot_date', 'popularity', 'is_explicit', 'album_name', 'album_release_date']].copy()
music_features_df = music_df[['spotify_id', 'duration_ms', 'danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness', 'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo', 'time_signature']].copy()

In [19]:
# Many duplicate songs, remove duplicates

music_features_df = music_features_df.drop_duplicates(subset=['spotify_id']).reset_index(drop=True)
music_features_df.head()

Unnamed: 0,spotify_id,duration_ms,danceability,energy,key,loudness,mode,speechiness,acousticness,instrumentalness,liveness,valence,tempo,time_signature
0,42UBPzRMh5yyz0EDPr6fr1,213645,0.731,0.685,7,-5.087,1,0.0572,0.122,0.0,0.317,0.811,123.01,4
1,04emojnbYkrRmv5qtJcgVP,184517,0.657,0.699,9,-3.92,1,0.0262,0.639,0.0,0.148,0.495,115.998,4
2,2RkZ5LkEzeHGRsmDqKwmaJ,186964,0.368,0.694,2,-6.141,1,0.06,0.704,7e-06,0.055,0.391,168.115,3
3,0FTmksd2dxiE5e3rWyJXs6,199032,0.436,0.723,1,-2.291,1,0.0301,9.4e-05,8.8e-05,0.0929,0.235,92.855,4
4,4gfrYDtaRmp6HPvN80V2ob,204963,0.598,0.798,0,-4.587,1,0.0295,0.576,0.000564,0.155,0.542,84.986,4


## Final check

Processed datasets:
- customers_df
- discounts_df
- products_df
- transactions_df
- music_info_df
- music_features_df

Check for:
- null
- nan
- duplicates
- improper format

##### Customers

In [20]:
customers_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1643306 entries, 0 to 1643305
Data columns (total 6 columns):
 #   Column       Non-Null Count    Dtype 
---  ------       --------------    ----- 
 0   Customer ID  1643306 non-null  int64 
 1   City         1643306 non-null  object
 2   Country      1643306 non-null  object
 3   Gender       1643306 non-null  object
 4   Job Title    1643306 non-null  object
 5   Age          1643306 non-null  int64 
dtypes: int64(2), object(4)
memory usage: 75.2+ MB


##### Discounts

In [21]:
discounts_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 181 entries, 0 to 180
Data columns (total 4 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   Discont       181 non-null    float64
 1   Category      181 non-null    object 
 2   Sub Category  181 non-null    object 
 3   Season        181 non-null    object 
dtypes: float64(1), object(3)
memory usage: 5.8+ KB


##### Products

In [22]:
products_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 17940 entries, 0 to 17939
Data columns (total 5 columns):
 #   Column          Non-Null Count  Dtype 
---  ------          --------------  ----- 
 0   Product ID      17940 non-null  int64 
 1   Category        17940 non-null  object
 2   Sub Category    17940 non-null  object
 3   Description EN  17940 non-null  object
 4   Color           17940 non-null  object
dtypes: int64(1), object(4)
memory usage: 700.9+ KB


##### Transactions

In [23]:
transactions_df.isnull().sum()

Customer ID             0
Product ID              0
Size               413102
Color             4350783
Unit Price              0
Quantity                0
Date                    0
Discount                0
Store ID                0
Currency                0
Payment Method          0
Invoice Total           0
dtype: int64

In [24]:
category_colors = products_df.groupby("Category")['Color'] \
                     .agg(lambda x: x.mode().iloc[0] if not x.mode().empty else None) \
                     .to_dict()

subcategory_colors = products_df.groupby("Sub Category")['Color'] \
                        .agg(lambda x: x.mode().iloc[0] if not x.mode().empty else None) \
                        .to_dict()

def get_product_color(pid):
    product = products_df.loc[products_df['Product ID'] == pid].iloc[0] 
    cat = product['Category']
    subcat = product['Sub Category']

    cat_color = category_colors.get(cat)
    sub_color = subcategory_colors.get(subcat)

    # Randomly pick between category/subcategory if both exist
    choices = [c for c in [cat_color, sub_color] if c is not None]
    if not choices:
        return None
    return np.random.choice(choices)

unique_products = transactions_df['Product ID'].unique()
product_color_dict = {pid: get_product_color(pid) for pid in unique_products}

product_color_df = pd.DataFrame.from_dict(product_color_dict, orient='index', columns=['Color_fill']).reset_index()
product_color_df.rename(columns={'index': 'Product ID'}, inplace=True)

transactions_df = transactions_df.merge(
    product_color_df,
    on='Product ID',
    how='left'
)

transactions_df['Color'] = transactions_df['Color'].fillna(transactions_df['Color_fill'])

transactions_df.drop(columns=['Color_fill'], inplace=True)

transactions_df.drop(columns=['Size'], inplace=True)

##### Music Info

In [25]:
music_info_df.info()

missing_ids = music_info_df.loc[
    music_info_df['album_name'].isna() | music_info_df['album_release_date'].isna(),
    'spotify_id'
].unique()

album_data = {track_id: get_album_info(track_id) for track_id in missing_ids}

for track_id, info in album_data.items():
    mask = music_info_df['spotify_id'] == track_id
    music_info_df.loc[mask, 'album_name'] = info['album_name']
    music_info_df.loc[mask, 'album_release_date'] = info['release_date']


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 166080 entries, 0 to 166079
Data columns (total 12 columns):
 #   Column              Non-Null Count   Dtype 
---  ------              --------------   ----- 
 0   spotify_id          166080 non-null  object
 1   name                166078 non-null  object
 2   artists             166078 non-null  object
 3   daily_rank          166080 non-null  int64 
 4   daily_movement      166080 non-null  int64 
 5   weekly_movement     166080 non-null  int64 
 6   country             166080 non-null  object
 7   snapshot_date       166080 non-null  object
 8   popularity          166080 non-null  int64 
 9   is_explicit         166080 non-null  bool  
 10  album_name          165976 non-null  object
 11  album_release_date  165976 non-null  object
dtypes: bool(1), int64(4), object(7)
memory usage: 14.1+ MB


In [26]:
music_info_df.dropna(subset=['name', 'artists'], inplace=True)
music_info_df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 166078 entries, 0 to 166079
Data columns (total 12 columns):
 #   Column              Non-Null Count   Dtype 
---  ------              --------------   ----- 
 0   spotify_id          166078 non-null  object
 1   name                166078 non-null  object
 2   artists             166078 non-null  object
 3   daily_rank          166078 non-null  int64 
 4   daily_movement      166078 non-null  int64 
 5   weekly_movement     166078 non-null  int64 
 6   country             166078 non-null  object
 7   snapshot_date       166078 non-null  object
 8   popularity          166078 non-null  int64 
 9   is_explicit         166078 non-null  bool  
 10  album_name          166078 non-null  object
 11  album_release_date  166078 non-null  object
dtypes: bool(1), int64(4), object(7)
memory usage: 15.4+ MB


##### Music Features

In [27]:
music_features_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4131 entries, 0 to 4130
Data columns (total 14 columns):
 #   Column            Non-Null Count  Dtype  
---  ------            --------------  -----  
 0   spotify_id        4131 non-null   object 
 1   duration_ms       4131 non-null   int64  
 2   danceability      4131 non-null   float64
 3   energy            4131 non-null   float64
 4   key               4131 non-null   int64  
 5   loudness          4131 non-null   float64
 6   mode              4131 non-null   int64  
 7   speechiness       4131 non-null   float64
 8   acousticness      4131 non-null   float64
 9   instrumentalness  4131 non-null   float64
 10  liveness          4131 non-null   float64
 11  valence           4131 non-null   float64
 12  tempo             4131 non-null   float64
 13  time_signature    4131 non-null   int64  
dtypes: float64(9), int64(4), object(1)
memory usage: 452.0+ KB


## Save transformed data as CSV

In [28]:
df_names = {
    'customers.csv': customers_df,
    'discounts.csv': discounts_df,
    'products.csv': products_df,
    'transactions.csv': transactions_df,
    'music_info.csv': music_info_df,
    'music_features.csv': music_features_df
}

fashion_folder = '../datasets/processed/fashion_data/'
music_folder = '../datasets/processed/music_data/'

assert os.path.isdir(fashion_folder), f"Folder does not exist: {fashion_folder}"
assert os.path.isdir(music_folder), f"Folder does not exist: {music_folder}"

In [29]:
for filename, df in df_names.items():
    if 'music' in filename:
        df.to_csv(os.path.join(music_folder, filename), index=False)
    else:
        df.to_csv(os.path.join(fashion_folder, filename), index=False)