# Notebook 02 â€” Cleaning, Normalization & Combine

**Goal**: Unify column names, parse dates, make numeric, combine into `df_all`.

**Important**: Airtel and Wynk are the SAME platform. Airtel data will be merged into Wynk by matching on track/artist/album.


**Canonical Columns**:`['source', 'activity_period', 'year_month', 'store_name', 'country', 'artist', 'album', 'track', 'revenue', 'stream_count', 'unit_type', 'project_code']`

In [22]:
import pandas as pd
import numpy as np
from pathlib import Path
import datetime

DATA_DIR = Path('../data')
OUTPUT_DIR = Path('../outputs/cleaned')
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

# Define Canonical Columns
CANONICAL_COLS = [
    'source', 'activity_period', 'year_month', 'store_name', 'country', 
    'artist', 'album', 'track', 'revenue', 'stream_count', 'unit_type', 'project_code'
]

In [23]:
# Load Raw Data
try:
    airtel_raw = pd.read_csv(DATA_DIR/'airtel-report.csv')
    jio_raw = pd.read_csv(DATA_DIR/'jiosaavn-report.csv')
    wynk_raw = pd.read_csv(DATA_DIR/'wynk-report.csv')
    print("Raw files loaded.")
except FileNotFoundError as e:
    print(f"Error: {e}")

Raw files loaded.


## Column Mapping Definitions
Define how each raw file maps to the canonical schema.

In [24]:
# Mappings based on EDA (Notebook 01)
# Key = Canonical Name, Value = Raw Column Name

AIRTEL_MAP = {
    'track': 'Song Name',
    'album': 'Album Name',
    'artist': 'Artist',
    'revenue': 'Rev'
    # Missing: stream_count, activity_period, country, isrc/project_code
}

JIO_MAP = {
    'track': 'song_name',
    'album': 'album_name',
    'artist': 'artist_name',
    'revenue': 'income',
    'stream_count': 'total',
    'project_code': 'isrc'
}

WYNK_MAP = {
    'track': 'song_name',
    'album': 'album_name',
    'artist': 'artist',
    'revenue': 'income',
    'stream_count': 'total',
    'project_code': 'isrc'
}

In [25]:
def clean_and_normalize(df, source_label, col_map, default_country='India'):
    df = df.copy()
    
    # 1. Rename columns based on map
    # Invert map for renaming: {Raw: Canonical}
    rename_map = {v: k for k, v in col_map.items()}
    df = df.rename(columns=rename_map)
    
    # 2. Add missing canonical columns with default values (NaN or specific)
    for col in CANONICAL_COLS:
        if col not in df.columns:
            df[col] = np.nan
            
    # 3. Set Source and Store Name (default to source_label if store_name missing)
    df['source'] = source_label
    if pd.isna(df['store_name']).all():
        df['store_name'] = source_label.capitalize()

    # 4. Clean Revenue
    # Remove currency symbols if present, then convert
    if df['revenue'].dtype == 'object':
        df['revenue'] = df['revenue'].astype(str).str.replace(r'[^\d.-]', '', regex=True)
    df['revenue'] = pd.to_numeric(df['revenue'], errors='coerce').fillna(0)
    
    # 5. Clean Stream Count
    if 'stream_count' in df.columns:
        df['stream_count'] = pd.to_numeric(df['stream_count'], errors='coerce').fillna(0)
    
    # 6. Date / Activity Period Parsing
    # TODO: If date is missing in CSV, we might need to infer from filename or user input.
    # For now, we look for common date columns if not already mapped.
    date_cols = [c for c in df.columns if 'date' in c.lower() or 'period' in c.lower()]
    if pd.isna(df['activity_period']).all() and date_cols:
        # Try the first found date column
        print(f"[{source_label}] Inferring date from column: {date_cols[0]}")
        df['activity_period'] = pd.to_datetime(df[date_cols[0]], errors='coerce')
    else:
        # Ensure it's datetime even if empty
        df['activity_period'] = pd.to_datetime(df['activity_period'], errors='coerce')
        
    # Create year_month
    df['year_month'] = df['activity_period'].dt.to_period('M').astype(str).replace('NaT', np.nan)

    # 7. Country Defaults
    if 'country' in df.columns:
        df['country'] = df['country'].fillna(default_country)
        
    # 8. String Normalization (Artist, Album, Track)
    for col in ['artist', 'album', 'track']:
        if col in df.columns:
            df[col] = df[col].astype(str).str.strip().str.title()

    # Return only canonical columns to ensure schema match
    return df[CANONICAL_COLS]

In [26]:
# Apply Cleaning
print("Cleaning Airtel (will merge into Wynk)...")
airtel_clean = clean_and_normalize(airtel_raw, 'wynk', AIRTEL_MAP)  # Note: source='wynk'

print("Cleaning JioSaavn...")
jio_clean = clean_and_normalize(jio_raw, 'jiosaavn', JIO_MAP)

print("Cleaning Wynk...")
wynk_clean = clean_and_normalize(wynk_raw, 'wynk', WYNK_MAP)

# Check shapes before merge
print(f"Airtel (before merge): {airtel_clean.shape}")
print(f"Jio: {jio_clean.shape}")
print(f"Wynk (before merge): {wynk_clean.shape}")

Cleaning Airtel (will merge into Wynk)...
[wynk] Inferring date from column: activity_period
Cleaning JioSaavn...
[jiosaavn] Inferring date from column: activity_period
Cleaning Wynk...
[wynk] Inferring date from column: activity_period
Airtel (before merge): (4, 12)
Jio: (2666, 12)
Wynk (before merge): (6212, 12)


  df['year_month'] = df['activity_period'].dt.to_period('M').astype(str).replace('NaT', np.nan)
  df['year_month'] = df['activity_period'].dt.to_period('M').astype(str).replace('NaT', np.nan)
  df['year_month'] = df['activity_period'].dt.to_period('M').astype(str).replace('NaT', np.nan)


In [27]:
# Merge Airtel into Wynk (same platform)
print("\n" + "="*60)
print("MERGING AIRTEL INTO WYNK (SAME PLATFORM)")
print("="*60)

# Create normalized keys for matching
airtel_clean['match_key'] = (airtel_clean['track'].str.lower().str.strip() + '|' + 
                              airtel_clean['artist'].str.lower().str.strip() + '|' + 
                              airtel_clean['album'].str.lower().str.strip())

wynk_clean['match_key'] = (wynk_clean['track'].str.lower().str.strip() + '|' + 
                            wynk_clean['artist'].str.lower().str.strip() + '|' + 
                            wynk_clean['album'].str.lower().str.strip())

# Find matches
matched_keys = set(airtel_clean['match_key']) & set(wynk_clean['match_key'])
print(f"Found {len(matched_keys)} matching tracks between Airtel and Wynk")

# Split Airtel into matched and unmatched
airtel_matched = airtel_clean[airtel_clean['match_key'].isin(matched_keys)].copy()
airtel_unmatched = airtel_clean[~airtel_clean['match_key'].isin(matched_keys)].copy()

print(f"Airtel - Matched: {len(airtel_matched)}, Unmatched: {len(airtel_unmatched)}")

# For matched records, aggregate revenue and streams
if len(airtel_matched) > 0:
    # Aggregate Airtel matched data by match_key
    airtel_agg = airtel_matched.groupby('match_key', as_index=False).agg({
        'revenue': 'sum',
        'stream_count': 'sum'
    })
    
    # Merge aggregated Airtel data into Wynk
    wynk_clean = wynk_clean.merge(airtel_agg, on='match_key', how='left', suffixes=('', '_airtel'))
    
    # Add Airtel revenue/streams to Wynk where matched
    wynk_clean['revenue'] = wynk_clean['revenue'] + wynk_clean['revenue_airtel'].fillna(0)
    wynk_clean['stream_count'] = wynk_clean['stream_count'] + wynk_clean['stream_count_airtel'].fillna(0)
    
    # Drop temporary columns
    wynk_clean = wynk_clean.drop(columns=['revenue_airtel', 'stream_count_airtel'])
    
    print(f"âœ… Merged {len(airtel_matched)} Airtel records into existing Wynk records")

# Add unmatched Airtel records as new entries
if len(airtel_unmatched) > 0:
    airtel_unmatched = airtel_unmatched.drop(columns=['match_key'])
    wynk_clean = pd.concat([wynk_clean, airtel_unmatched], ignore_index=True)
    print(f"âœ… Added {len(airtel_unmatched)} new Airtel records to Wynk")

# Drop match_key column
wynk_clean = wynk_clean.drop(columns=['match_key'])

print(f"\nWynk (after merge): {wynk_clean.shape}")
print("="*60)

# Combine final dataframes (JioSaavn + Wynk)
df_all = pd.concat([jio_clean, wynk_clean], ignore_index=True, sort=False)

print(f"\nðŸ“Š FINAL Combined Shape: {df_all.shape}")
print(f"   - JioSaavn: {len(jio_clean):,} rows")
print(f"   - Wynk (incl. Airtel): {len(wynk_clean):,} rows")
display(df_all.head())


MERGING AIRTEL INTO WYNK (SAME PLATFORM)
Found 0 matching tracks between Airtel and Wynk
Airtel - Matched: 0, Unmatched: 4
âœ… Added 4 new Airtel records to Wynk

Wynk (after merge): (6216, 12)

ðŸ“Š FINAL Combined Shape: (8882, 12)
   - JioSaavn: 2,666 rows
   - Wynk (incl. Airtel): 6,216 rows


Unnamed: 0,source,activity_period,year_month,store_name,country,artist,album,track,revenue,stream_count,unit_type,project_code
0,jiosaavn,NaT,,Jiosaavn,India,Jojo Mrinal Mukherjee Prem Kumar,Tara Maa Ki Mahima Aapar,Tara Maiya Tere Darbar,0.675,9.0,,INT441651504
1,jiosaavn,NaT,,Jiosaavn,India,Sanchita Shaan Swarna,Hiya Deba Kaak,Lovely Lovely Night,0.375,5.0,,INT441629102
2,jiosaavn,NaT,,Jiosaavn,India,Indradeep Dasgupta,Kanamachi,Mon Bawra,36.15,482.0,,INT441632205
3,jiosaavn,NaT,,Jiosaavn,India,Raja Narayan Deb,Monchuri,Bolo Bolo Bolo Shobe,0.45,6.0,,INT441636701
4,jiosaavn,NaT,,Jiosaavn,India,Jojo Mrinal Mukherjee Prem Kumar,Tara Maa Ki Mahima Aapar,Natraj Nache Re,0.675,9.0,,INT441651502


## Post-Combine Normalization

In [28]:
# 1. Normalize Artist Names (Simple Strip/Lower for grouping, Title for display)
df_all['artist'] = df_all['artist'].fillna('Unknown Artist')

# 2. Normalize Store Names (Airtel is already merged into Wynk)
store_map = {
    'Jiosaavn': 'JioSaavn',
    'Wynk': 'Wynk Music'
}
df_all['store_name'] = df_all['store_name'].replace(store_map)

print("\nðŸ“Š FINAL PLATFORM DISTRIBUTION:")
print(df_all.groupby('store_name').agg({
    'revenue': 'sum',
    'stream_count': 'sum',
    'track': 'count'
}).rename(columns={'track': 'row_count'}))


ðŸ“Š FINAL PLATFORM DISTRIBUTION:
                 revenue  stream_count  row_count
store_name                                       
JioSaavn     6940.200000       92536.0       2666
Wynk Music  22760.579356      404885.0       6216


In [29]:
# Save to Parquet
output_path = OUTPUT_DIR / 'df_all.parquet'
df_all.to_parquet(output_path, index=False)
print(f"Saved canonical dataset to {output_path}")

Saved canonical dataset to ../outputs/cleaned/df_all.parquet


## Cleaning Log

In [30]:
log_path = Path('../logs/cleaning.log')
log_path.parent.mkdir(exist_ok=True)

with open(log_path, 'w') as f:
    f.write(f"Cleaning Log - {datetime.datetime.now()}\n")
    f.write("="*50 + "\n")
    f.write(f"Total Rows Processed: {len(df_all)}\n")
    f.write(f"Sources: {df_all['source'].unique()}\n")
    f.write(f"Missing Dates: {df_all['activity_period'].isna().sum()}\n")
    f.write(f"Total Revenue: {df_all['revenue'].sum():.2f}\n")
    
print(f"Log written to {log_path}")

Log written to ../logs/cleaning.log
