In [None]:
import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler, OrdinalEncoder, OneHotEncoder, LabelEncoder, MinMaxScaler, MaxAbsScaler, RobustScaler, Normalizer, QuantileTransformer, PowerTransformer
import plotly.express as px

from sklearn.impute import SimpleImputer

In [None]:
# convert data to dataframe
df_missing = pd.read_csv('../../../data/clean_data/data_clean_missing.csv')


# constrain duration_ms to <= 700,000 ms due to outliers
df_constrain = df_missing[df_missing['duration_ms'] <= 700000]

#replace missing values with nan
df_constrain = df_constrain.replace([-1, '?'], np.nan)



In [None]:
# impute duration and tempo columns using the mean strategy
imp_mean = SimpleImputer(strategy='mean')
df_constrain['duration_ms_imp'] = imp_mean.fit_transform(df_constrain[['duration_ms']])

imp_median = SimpleImputer(strategy='median')
df_constrain['tempo_imp'] = imp_median.fit_transform(df_constrain[['tempo']])

In [None]:
# drop original columns
df_constrain.drop(['duration_ms', 'tempo'], axis = 1, inplace = True)

# replace Rap with Hip-Hop
df_constrain['music_genre'].replace({'Rap': 'Hip-Hop'}, inplace = True)

In [None]:
df_test = df_constrain

In [None]:
# pick random rows of Hip-Hop music genre to re-balance dataset
np.random.seed(10)

remove_n = 5000
drop_indices = np.random.choice(df_test[df_test['music_genre'] == 'Hip-Hop'].index, remove_n, replace=False)
df_subset = df_test.drop(drop_indices)

In [None]:
# categorize each feature for encoding or scaling
cat_feats = ['artist_name', 'key', 'mode', 'music_genre']
cat_feats_ohe = ['artist_name', 'mode']
cat_feats_le = ['music_genre']

num_feats = ['popularity', 'acousticness','danceability', 'duration_ms', 'energy', 'instrumentalness', 'liveness', 'loudness', 'speechiness', 'tempo', 'valence']
num_feats_scale = ['popularity', 'duration_ms_imp', 'loudness', 'tempo_imp']
num_feats_imp_mean = ['duration_ms']
num_feats_imp_median = ['tempo']

In [None]:
# list of features
feats = ['popularity', 'acousticness','danceability', 'duration_ms_imp', 'energy', 'instrumentalness', 'loudness', 'speechiness', 'tempo_imp', 'valence', 'mode_Major', 'mode_Minor']

In [None]:
# instantiate labelencoder object
le = LabelEncoder()

# apply le on categorical feature columns
df_subset[cat_feats_le] = df_subset[cat_feats_le].apply(lambda col: le.fit_transform(col))

In [None]:
# get rid of unwanted values for artist names
df_subset.columns = df_subset.columns.str.translate("".maketrans({"[":"{", "]":"}","<":"^"}))

In [None]:
# get dummy variables for categorical variables
df_out = pd.get_dummies(df_subset, columns = cat_feats_ohe)

In [None]:
# make custom map for key column
dic = {'C': '0', 'C#': '1', 'D': '2', 'D#': '3', 'E': '4',
       'F': '5', 'F#': '6', 'G': '7', 'G#': '8', 'A': '9',
       'A#': '10', 'B': '11'}

# use custom map on key column
df_out.replace({"key": dic}, inplace = True)

In [None]:
# Scale numerical features that need to be scaled
scaler = StandardScaler()

df_out[num_feats_scale] = scaler.fit_transform(df_subset[num_feats_scale])


In [None]:
# identify target variable
df_target = df_scaled['music_genre']

# identify input variables
df_input = df_scaled.drop('music_genre', axis=1)

# make separate input for feature selection
df_input_selection = df_scaled.drop(['music_genre', 'liveness', 'key', 'energy', 'valence', 'duration_ms_imp', 'tempo_imp'], axis=1)

# make separate input for feature selection
df_input_selection = df_scaled[feats]

In [None]:
X_train, X_test, y_train, y_test = train_test_split(df_input, df_target, test_size=0.3, random_state=1001)

In [None]:
X_train_select, X_test_select, y_train_select, y_test_select = train_test_split(df_input_selection, df_target, test_size=0.3, random_state=1001)

# XGBoost

In [None]:
data_dmatrix = xgb.DMatrix(data=df_input, label=df_target)

In [None]:
xg_reg = xgb.XGBRegressor(objective= 'multi:softmax', colsample_bytree = 0.3, learning_rate = 0.2,
                max_depth = 5, alpha = 0.5, n_estimators = 100, num_class = 10)

xg_reg.fit(X_train,y_train)

preds = xg_reg.predict(X_test)

print(xg_reg.feature_importances_)

# Check accuracy score
print('Model accuracy score: {0:0.4f}'. format(accuracy_score(y_test, preds)))

# Model Precision: what percentage of positive tuples are labeled as such?
print("Precision:",metrics.precision_score(y_test, preds, average = None))

# Model Recall: what percentage of positive tuples are labelled as such?
print("Recall:",metrics.recall_score(y_test, preds, average = None))

In [None]:
xgb.plot_importance(xg_reg_select, max_num_features = 10)
plt.rcParams['figure.figsize'] = [10, 10]

# Pipelining

In [None]:
numeric_features = ['ApplicantIncome', 'CoapplicantIncome', 'total_income', 'LoanAmount', 'Loan_Amount_Term']
numeric_transformer = Pipeline(
    steps=[("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler())]
)


categorical_features = ['Gender', 'Married', 'Dependents', 'Education', 'Self_Employed', 'Credit_History', 'Property_Area']
categorical_transformer = OneHotEncoder(handle_unknown="ignore")

preprocessor = ColumnTransformer(
    transformers=[
        ("num", numeric_transformer, numeric_features),
        ("cat", categorical_transformer, categorical_features),
    ]
)


# This dataset is way too high-dimensional. Better do PCA:
pca = PCA()

# Maybe some of the original features were good, too?
selection = SelectKBest()

# Build an transformer from PCA and Univariate selection:
combined_features = FeatureUnion([("pca", pca), ("univ_select", selection)])

# We will initialize the classifier
svm = SVC(kernel="linear")

# create our pipeline from FeatureUnion 
pipeline = Pipeline([("features", combined_features), ("svm", svm)])

# set up our parameters grid
param_grid = {"features__pca__n_components": [1, 2, 3],
                  "features__univ_select__k": [1, 2, 3],
                  "svm__C":[0.1, 1, 10]}

# create a Grid Search object
grid_search = GridSearchCV(pipeline, param_grid, verbose=10, refit=True)    

# fit the model and tune parameters
grid_search.fit(X, y)




# Append classifier to preprocessing pipeline.
# Now we have a full prediction pipeline.
clf = Pipeline(
    steps=[("preprocessor", preprocessor), ('f_classif', SelectKBest(k=3)), ("classifier", RandomForestClassifier())]
)

X_train, X_test, y_train, y_test = train_test_split(df_clean2, y, test_size=0.2, random_state=0)

clf.fit(X_train, y_train)
print("model score: %.3f" % clf.score(X_test, y_test))

In [None]:
# This dataset is way too high-dimensional. Better do PCA:
pca = PCA()

# Maybe some of the original features were good, too?
selection = SelectKBest()

# Build an transformer from PCA and Univariate selection:
combined_features = FeatureUnion([("pca", pca), ("univ_select", selection)])

# We will initialize the classifier
svm = SVC(kernel="linear")

# create our pipeline from FeatureUnion 
pipeline = Pipeline([("features", combined_features), ("svm", svm)])

# set up our parameters grid
param_grid = {"features__pca__n_components": [1, 2, 3],
                  "features__univ_select__k": [1, 2, 3],
                  "svm__C":[0.1, 1, 10]}

# create a Grid Search object
grid_search = GridSearchCV(pipeline, param_grid, verbose=10, refit=True)    

# fit the model and tune parameters
grid_search.fit(X, y)


In [None]:
param_grid = {
    "preprocessor__num__imputer__strategy": ["mean", "median"],
    'f_classif__k': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12],
    "classifier__n_estimators": [1, 10, 100],
}

grid_search = GridSearchCV(clf, param_grid, cv=10)
grid_search

In [None]:
grid_search.fit(X_train, y_train)

print("Best params:")
print(grid_search.best_params_)

In [None]:
pickle.dump(grid_search, open("model.p", "wb" ))

In [None]:
print(f"Internal CV score: {grid_search.best_score_:.3f}")

In [None]:
print(
    (
        "best random forest classifier from grid search: %.3f"
        % grid_search.score(X_test, y_test)
    )
)