# Data Pre-Processing and Feature Engineering 
### For regression model
This Notebook will process the created data and create features for the regression model. The Following data sources are used: <br>
dim_product: product fact table <br>
streaming: aggregated daily streaming data (2018/6/1/-2020/7/2) <br>
Note: New features can be added as a function. Ideally, the prepcrocessing and feature engineering are preferred to be completed in a ETL tool before data arrives in Sagemaker. 

In [78]:
#import packages
#!conda update pandas -y
#!conda update s3fs -y
#!pip install --upgrade numpy

import datetime
import pandas as pd
import numpy as np
import re
from pprint import pprint
from collections import Counter
from sklearn import preprocessing
from sklearn.preprocessing import PowerTransformer
from scipy import stats

## Ingest data and pre-processing

### Read in dim_product, filter product table

In [3]:
#read in dim_product as chunks and concat them together as the file is big
bucket='wmg-streaming-prediction-dev/streaming_data_processed'
filename = 'dim_product_all.csv'
data_location = 's3://{}/{}'.format(bucket, filename)

# Read in everything as string
col_names = pd.read_csv(data_location, nrows=0).columns # get headers 
types_dict = {}
types_dict.update({col: 'str' for col in col_names if col not in types_dict})

csv_chunks = pd.read_csv(data_location, chunksize = 10000, dtype = types_dict, index_col=0)
df_product = pd.concat(chunk for chunk in csv_chunks)

In [4]:
#eliminate tracks with more than 6 artists to keep artist related features clean. This step can be discussed 
#further with domain expert on how to clean up product fact table. 
num_names = pd.DataFrame(df_product.
                         groupby('artist_id')['artist_display_name'].nunique()).reset_index()
num_names.rename(columns={'artist_display_name':'num_artist_names'},inplace=True)
num_names_lim = num_names[num_names['num_artist_names']<=6]
df_product = df_product.merge(num_names_lim,
                                       on='artist_id',
                                       how='inner')

### Read in streaming data, pre-processing

In [5]:
#read in provided streaming data
##data anomolies were removed by Siva: stream date after release date, and stream date or release date in future

bucket_streaming = 'wmg-streaming-prediction-dev/product-key-agg'
filename = 'rna.gz'
data_location = 's3://{}/{}'.format(bucket_streaming, filename)

colnames = ['product_key', 'ISRC', 'release_date', 'stream_date', 'streams']
types_dict = {'product_key': 'str'}

df_streaming = pd.read_csv(data_location, compression='gzip')
df_streaming.columns = colnames

# Convert dates to datetime
df_streaming['stream_date'] = pd.to_datetime(df_streaming['stream_date'],errors='coerce')
df_streaming['release_date'] = pd.to_datetime(df_streaming['release_date'],errors='coerce')

In [6]:
# Get total streams for each song
df_streaming_agg = df_streaming.groupby(['ISRC']).agg({'streams': 'sum'}).reset_index().sort_values('streams', ascending=False)
# Filter applied: Keep only songs that have > 1000 streams to limit the size of streaming data
df_streaming_agg = df_streaming_agg[df_streaming_agg['streams'] > 1000]
df_streaming_lim = df_streaming.merge(df_streaming_agg['ISRC'],on='ISRC',how='inner')

In [7]:
# Filter applied to product table to limit size 
df_streaming_lim_ids = pd.DataFrame(df_streaming_lim['ISRC'].unique(),columns=['ISRC'])
df_product_lim = df_product.merge(df_streaming_lim_ids,
                                  left_on = 'product_id',
                                  right_on = 'ISRC',
                                  how = 'inner')

In [49]:
#data type and formatting
df_product_lim['product_key'] = df_product_lim['product_key'].astype('str')
df_streaming_lim['product_key'] = df_streaming_lim['product_key'].astype('str')
df_product_lim.rename(columns = {'first_global_release_date': 'release_date'}, inplace = True)

In [8]:
#uncomment and run this cell if OOM error
del df_product
del df_streaming

## Create Features
The Following features will be created:
artist total tracks released, artist total albums released,
artist total streams, if the track is a collaboration track,

#### Feature 1 & 2: artist's total tracks released and total albums released

In [9]:
def create_features_product(df):
    #input: cleaned product dimension table
    #output: artist's total number of tracks released (calculated as number of unique products associated by artist) 
    #and total number of album released (calculated as number of unique project_ids associated by artist).
    df_product_numtracks = df.groupby(['artist_id']).agg({'product_key': 'count', 'project_key': pd.Series.nunique}).reset_index().sort_values('product_key', ascending=False)
    df_product_numtracks.rename(columns={'product_key': 'artist_num_tracks'}, inplace=True)
    df_product_numtracks.rename(columns={'project_key': 'artist_num_albums'}, inplace=True)

    print('Shape of numtracks df:\t{}'.format(df_product_numtracks.shape))
    print('Number of unique artists in original df:\t{}'.format(df['artist_id'].nunique()))
    
    return df_product_numtracks

In [11]:
#the dataframe is saved in s3 
#link to it 's3://wmg-streaming-prediction-dev/streaming_data_processed/tracks_albums_by_artist.csv'
df_numtracks = create_features_product(df_product_lim)

Shape of numtracks df:	(6605, 3)
Number of unique artists in original df:	6605


#### Feature 3:  if the track is a Collaboration track

In [12]:
# Create variable for whether song is collaboration (calculated as if the track's name contains "feat")
# filtered product table is saved in S3
# link to the filtered product table: 's3://wmg-streaming-prediction-dev/streaming_data_processed/dim_product_limited.csv'
df_product_lim['Collaboration'] = df_product_lim['product_title'].str.contains('Feat\.', regex=True, flags=re.IGNORECASE).astype(int)

#### Feature 4: total streams per artist

In [13]:
# Function for number of streams by artist
def create_num_streams_by_artist(stream_df, prod_df):
    #input: cleaned streaming table and product dim table
    #output: artist's total streams (calculated as number of sum of streams by artist).
    df_artist = stream_df.merge(prod_df[['artist_id','product_id']],
                         left_on='ISRC',
                         right_on='product_id',
                         how = 'inner')    
    df_streaming_agg = df_artist.groupby(['artist_id']).agg({'streams': 'sum'}).reset_index().sort_values('streams', ascending=False)
    df_streaming_agg.rename(columns={'Streams':'Total artist streams'},inplace=True)    
    return df_streaming_agg

In [14]:
# the dataframe (artist, total stream per artist) is saved in S3
# link to the data: 's3://wmg-streaming-prediction-dev/streaming_data_processed/total_streams_by_artist.csv'
df_artist_total_streams = create_num_streams_by_artist(df_streaming_lim, df_product_lim)

#### Feature 5, 6, & 7: day 1, day 2 and day 3 streams per track after release

In [15]:
def create_first3days_volume(df):
    # Restrict to rows where stream date and release date are equal, 1 day apart, 2 days apart, 3 days apart
    mask = ((df['stream_date']-df['release_date']==datetime.timedelta(days=0)) |
            (df['stream_date']-df['release_date']==datetime.timedelta(days=1)) |
            (df['stream_date']-df['release_date']==datetime.timedelta(days=2)) |
            (df['stream_date']-df['release_date']==datetime.timedelta(days=3)))

    df_streaming_dayvolume = df[mask]

    # Create helper variables for each day
    df_streaming_dayvolume['day0'] = df_streaming_dayvolume['stream_date']==df_streaming_dayvolume['release_date']
    df_streaming_dayvolume['day1'] = df_streaming_dayvolume['stream_date']==(df_streaming_dayvolume['release_date']+datetime.timedelta(days=1))
    df_streaming_dayvolume['day2'] = df_streaming_dayvolume['stream_date']==(df_streaming_dayvolume['release_date']+datetime.timedelta(days=2))
    df_streaming_dayvolume['day3'] = df_streaming_dayvolume['stream_date']==(df_streaming_dayvolume['release_date']+datetime.timedelta(days=3))
    
    print('Shape of daily streaming data:\t{}'.format(df_streaming_dayvolume.shape))
    
    # Partition into different dataframes and rename cols
    day0 = df_streaming_dayvolume[df_streaming_dayvolume['day0']]
    day0 = day0[['product_key','ISRC','streams']]
    day0 = day0.rename(columns={'streams':'day0_volume'})
    print('Number of songs with day 0 data:\t{}'.format(len(day0)))

    day1 = df_streaming_dayvolume[df_streaming_dayvolume['day1']]
    day1 = day1[['product_key','ISRC','streams']]
    day1 = day1.rename(columns={'streams':'day1_volume'})
    print('Number of songs with day 1 data:\t{}'.format(len(day1)))

    day2 = df_streaming_dayvolume[df_streaming_dayvolume['day2']]
    day2 = day2[['product_key','ISRC','streams']]
    day2 = day2.rename(columns={'streams':'day2_volume'})
    print('Number of songs with day 2 data:\t{}'.format(len(day2)))

    day3 = df_streaming_dayvolume[df_streaming_dayvolume['day3']]
    day3 = day3[['product_key','ISRC','streams']]
    day3 = day3.rename(columns={'streams':'day3_volume'})
    print('Number of songs with day 3 data:\t{}'.format(len(day3)))

    # Merge everything together
    df_daily_volumes=day0.merge(day1[['ISRC','day1_volume']].merge(day2[['ISRC','day2_volume']].merge(day3[['ISRC','day3_volume']],on='ISRC',how='outer'),on='ISRC',how='outer'),on='ISRC',how='outer')
    df_daily_volumes.drop(columns=['product_key'],inplace=True)

    print('Shape of final data frame:\t{}'.format(df_daily_volumes.shape))
    
    return df_daily_volumes
    

In [16]:
df_daily_volumes = create_first3days_volume(df_streaming_lim)

Shape of daily streaming data:	(196016, 9)
Number of songs with day 0 data:	46759
Number of songs with day 1 data:	49296
Number of songs with day 2 data:	49535
Number of songs with day 3 data:	50426
Shape of final data frame:	(52543, 5)


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user

#### Feature 8, 9 & target variable: Artist previous release streams, artist previous release first week streams, and target variable: song's first week streams.

In [41]:
def head_sum(x):
    #helper function to get the first seven numbers' sum of a column.
    return x.head(7).sum()

def create_artist_prev_avg_streams(stream_df, prod_df):
    #input: cleaned streaming table and product dim table
    #output: 
    #feature 8: avg_stream_per_project(dataframe): artist's previous release's streams
    #feature 9: first_week_streams(dataframe): artist's previous release's first week streams
    #target variable: current song's first week's stream volumes
    
    # add project and artist related information to streaming data, and clean the streaming table
    stream_with_project = stream_df.merge(prod_df[['product_key', 'project_key', 'product_title', 'artist_id']], how = 'left', on = 'product_key')
    #remove null artist values and and artist not recorded in product table
    stream_with_project = stream_with_project[~stream_with_project['artist_id'].isnull()]
    stream_with_project = stream_with_project[stream_with_project['artist_id'].isin(prod_df['artist_id'])]
    stream_with_project['stream_date_datetime'] = pd.to_datetime(stream_with_project['stream_date'])
    stream_with_project['day_of_week'] = stream_with_project['stream_date_datetime'].dt.dayofweek
    
    #feature 8: artist most recent previous release's streams. (calculated as total streams up to date of the previous release by artist).
    # note that if the artist's last release has more than one song (more than 1 unique ISRC at the same release date and with the same project id), then we calculate an average of the these songs.
    # get total streams per song
    total_stream_per_product = stream_with_project.groupby('product_key').agg({'artist_id': 'max', 'project_key': 'max', 'product_title': 'max', 'release_date': 'max', 'streams': 'sum'}).reset_index()
    # get average streams per project with the same release date and same artist, and all the song titles for the current project 
    avg_stream_per_project = total_stream_per_product.groupby(['artist_id', 'project_key', 'release_date']).agg({'streams': 'mean',  'product_title': (lambda column: "|".join(column))}).reset_index()
    # get average streams per project with the same release date and same artist, and all the song titles from last release
    avg_stream_per_project['shifted_stream'] = avg_stream_per_project.sort_values(['artist_id','release_date'], ascending=True).groupby(['artist_id'])['streams'].shift(1)
    avg_stream_per_project['shifted_prod_title'] = avg_stream_per_project.sort_values(['artist_id','release_date'], ascending=True).groupby(['artist_id'])['product_title'].shift(1)
    avg_stream_per_project['shifted_project_key'] = avg_stream_per_project.sort_values(['artist_id','release_date'], ascending=True).groupby(['artist_id'])['project_key'].shift(1)
    avg_stream_per_project['shifted_stream'].fillna(0, inplace = True) ## fill na with 0: assuming no album before
    avg_stream_per_project.columns = ['artist_id', 'project_key', 'release_date', 'streams', 'product_title', 'previous_release_avg_streams', 'prev_prod_title', 'prev_project_key']
    
    #feature 9: artist most recent previous release's first week streams. (calculated as first seven days' streams since release of the previous release by artist).
    # note that if the artist's last release has more than one song (more than 1 unique ISRC at the same release date and with the same project id), then we calculate an average of the these songs.
    stream_with_project = stream_with_project.sort_values(['product_key','stream_date_datetime'], ascending=True)
    column_funcs = {'streams': head_sum, # first seven days sum
                'project_key': max, # all the vals should be the same
                'artist_id': max, # all the vals should be the same
                'release_date': max }  # all the vals should be the same
    collapsed = stream_with_project.groupby('product_key').aggregate(column_funcs)
    first_week_streams = collapsed.reset_index().sort_values(['streams'], ascending=False)
    first_week_streams.columns = ['product_key', 'first_week_streams', 'project_key', 'artist_id', 'release_date']
    # get average first week streams per project with the same release date and same artist, and all the song titles for the current project 
    first_week_streams = first_week_streams.groupby(['artist_id', 'project_key', 'release_date']).agg({'first_week_streams': 'mean'}).reset_index()
    # get average first week streams per project with the same release date and same artist, and all the song titles from last release
    first_week_streams['previous_release_first_week_stream'] = first_week_streams.sort_values(['artist_id','release_date'], ascending=True).groupby(['artist_id'])['first_week_streams'].shift(1)
    first_week_streams['previous_release_first_week_stream'].fillna(0, inplace = True) ## fill na with 0: assuming no album before
    
    # creat target variable: each song's first week total streams
    first_week_streams_target = collapsed.reset_index().sort_values(['streams'], ascending=False)
    first_week_streams_target.columns = ['product_key', 'first_week_streams', 'project_key', 'artist_id', 'release_date']
    #del stream_with_project
    return avg_stream_per_project, first_week_streams, first_week_streams_target

In [42]:
avg_prev_streams, avg_prev_first_week_streams, target_var = create_artist_prev_avg_streams(df_streaming_lim, df_product_lim)

#### Merge all created features together

In [54]:
avg_prev_streams.dtypes

artist_id                               object
project_key                             object
release_date                    datetime64[ns]
streams                                float64
product_title                           object
previous_release_avg_streams           float64
prev_prod_title                         object
prev_project_key                        object
dtype: object

In [57]:
# Combine all files
print(df_product_lim.shape)
df_final = df_product_lim.merge(df_artist_total_streams,
                                     on='artist_id',
                                     how='inner')
print(df_final.shape)

df_final = df_final.merge(df_numtracks,
                          on='artist_id',
                          how='inner')
print(df_final.shape)

df_final = df_final.merge(df_daily_volumes,
                          on='ISRC',
                          how='inner')
print(df_final.shape)
df_final['release_date'] = pd.to_datetime(df_final['release_date'])
df_final = df_final.merge(avg_prev_streams[['project_key', 'release_date',  'previous_release_avg_streams', 'prev_prod_title', 'prev_project_key']],
                          how = 'left', 
                          on = ['project_key', 'release_date'])
print(df_final.shape)

df_final = df_final.merge(avg_prev_first_week_streams[['project_key', 'release_date', 'previous_release_first_week_stream']], 
                          how = 'left', 
                          on = ['project_key', 'release_date'])
print(df_final.shape)

df_final = df_final.merge(target_var[['product_key', 'first_week_streams']],
                                  how = 'left', 
                                  on = 'product_key')
print(df_final.shape)

(49294, 14)
(49294, 15)
(49294, 17)
(44036, 21)
(44036, 24)
(44036, 25)
(44036, 26)


#### one-hot encoding categorical feature: genre
to keep the categorical feature size down, we filtered out genres with less than a few tens songs, and merged same/similar genre terms together based on our understanding. This mapping can be modified in the future by domain expert.


In [63]:
def process_genre(feature_df):
    # input: merged feature dataframe.
    # output: merged feature dataframe with genre converted to one hot encoded feature columns.
    #filter out genres with less than a few tens songs
    genre_filter_out = feature_df.groupby(['major_genre_code']).count().reset_index().sort_values(by = 'product_key', ascending = False)['major_genre_code'][-19:]
    df_final_feature = feature_df[~feature_df['major_genre_code'].isin(genre_filter_out)]
    # create a mapping to merged same/similar genre terms together
    equiv = {2697: 2714, 2698: 2699, 2700:2699, 2707: 1000,2717: 2744, 2719: 1015685, 2727: 2710, 2728: 1000, 2738: 1000, 2743: 2767, 2750: 1001, 
            2752: 1000, 2756: 2755, 2768: 2744, 2774: 2744, 3933: 2699, 1000127: 2734,1000128: 2734, 1000129: 2734, 1000130: 2734, 1000328: 1000,
            1000386: 1001, 1008945:1000, 1009585:1001, 1009905:1001, 1011382:1001, 1011385:2744,  1013145:2767, 1014465: 1001, 1014965:1001, 1006386: 1001,
            2705: 2732, 2771: 2767, 2710: 2744, 2724: 2744}
    df_final_feature['genre_mapped'] = df_final_feature['major_genre_code'].map(equiv)
    df_final_feature['genre_mapped'].fillna(df_final_feature['major_genre_code'], inplace = True) # for songs we are not able to find a match, use orginal genre code
    df_final_feature = pd.concat([df_final_feature,pd.get_dummies(df_final_feature['genre_mapped'], prefix='genre')],axis=1)
    return df_final_feature

In [64]:
df_final = process_genre(df_final)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  self._update_inplace(new_data)


#### Power transform total stream for the artist 
to normalize and bucket each artist's total stream; also avoid data leaking. You can also self-define artist's bucket with a threshold.

In [97]:
#initiate 
pt = PowerTransformer(method='box-cox', standardize=False,) 

#Fit the data to the powertransformer
scale = pt.fit(df_final[['streams']])

#Transform the data. you should transform the future data with the same scale above.
scaled_stream = pt.transform(df_final[['streams']])
df_final['total_streams_boxcox'] = scaled_stream

#### Final data quality check

In [101]:
df_final = df_final.fillna(0) # fill null values with 0 
#only keep data with first three day's volume bigger than 0
df_final = df_final[(df_final['day0_volume'] > 0) & (df_final['day1_volume'] > 0) & (df_final['day2_volume'] > 0)]
#only look at songs with first week's stream bigger than 200
df_final = df_final[df_final['first_week_streams'] > 200]
# clean out columns not for training
df_final.rename(columns = {'Collaboration': 'has_collaboration'}, inplace = True)
df_final =df_final[['product_key', 
        'has_collaboration',
       'day0_volume', 'day1_volume', 'day2_volume', 
       'previous_release_avg_streams', 'genre_mapped',
       'previous_release_first_week_stream', 'first_week_streams',
        'total_streams_boxcox', 'artist_num_tracks',
       'genre_1000128', 'genre_1000345', 'genre_1000346',
       'genre_1009046', 'genre_1009585', 'genre_1009905', 'genre_1011385',
       'genre_1014465', 'genre_1014965', 'genre_1015685', 'genre_2698',
       'genre_2699', 'genre_2700', 'genre_2705', 'genre_2706', 'genre_2708',
       'genre_2709', 'genre_2711', 'genre_2714', 'genre_2719', 'genre_2724',
       'genre_2727', 'genre_2729', 'genre_2732', 'genre_2734', 'genre_2736',
       'genre_2744', 'genre_2751', 'genre_2753', 'genre_2754', 'genre_2755',
       'genre_2758', 'genre_2767', 'genre_2774']]

In [103]:
# Output file
output_location = 's3://wmg-streaming-prediction-dev/streaming_data_processed'
df_final.to_csv('{}/df_final_feature_v4.csv'.format(output_location))

#### Split train and test

In [None]:
df_final = df_final.sample(frac=1, random_state=1729)

In [None]:
grouped_df = df_final.groupby('genre_mapped')
arr_list = [np.split(g, [int(.7 * len(g)), int(.9 * len(g))]) for i, g in grouped_df]

train_data = pd.concat([t[0] for t in arr_list])
validation_data = pd.concat([t[1] for t in arr_list])
test_data  = pd.concat([v[2] for v in arr_list])

In [None]:
def process_data(data, name):
    data = data.drop(columns = ['product_key', 'genre_mapped'])
    data = pd.concat([data['first_week_streams'], data.drop(['first_week_streams'], axis=1)], axis=1) 
    data.to_csv(name, header=False, index=False)

In [None]:
process_data(train_data, 'train_updated.csv')
process_data(validation_data, 'validation_updated.csv')

In [None]:
s3_input_train = boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'train/train.csv')).upload_file('train_updated.csv')
s3_input_validation = boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'validation/validation.csv')).upload_file('validation_updated.csv')