## Truc Tran -- i6213546
### Topics available
  ## Update "/dbfs/sh-recommender-pred-data/introduction_blocks_topics.csv" file when new topic is introduced
### Input for Data class:
  ## user features, block features, block started, block finished, question played, video played tables
### Input for Predictor class:
  ## training data and predicting data got from Data class

In [0]:
import pandas as pd
import numpy as np
import multiprocessing
from sklearn.model_selection import train_test_split as tts
from sklearn.linear_model import LogisticRegression
from sklearn.metrics.pairwise import nan_euclidean_distances
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error
from imblearn.over_sampling import SMOTE
from habtic_data_ml_pipeline.utils.feature_generator_framework import FeaturesTableGenerator

## Data class: Get training and predicting data for probabilistic model

In [0]:
"""
  Extract all features for training model
  Args:
    tables: user features, block features, block_started, block_finished, question_played, video_played events
    return: 
      - Training features table (exclude the latest state)
      - Predicting features table
"""
class Data:
  def __init__(self, user_ft, block_ft, block_start, block_finish, question, video):
    self.user_ft = user_ft
    self.block_ft = block_ft
    self.block_start = block_start
    self.block_finish = block_finish
    self.question = question
    self.video = video
    """
      add new introduction block here when new topic is introduced
    """
    self.introduction_blocks = ['pa2-module1', 'better-breathing', 'bhh1-module1', 'moving-during-long-distance-travel', 'ilt2-module1',\
                                'iyp1-module1', 'my-private-moment', 'optimizing-memory', 'os1-module1', 'conversation-tensing-to-relax', 'sleep-and-shift-work-p1', 'clc1-module1',\
                                'conversation-dealing-with-conflicts', 'exploring-creativity', 'regulating-emotions', 'kyb2-module1', 'conversation-working-behind-a-monitor',\
                                'ub2-module1', 'sp2-module1', 'wr1-module1', 'hhw1-module1', 'managing-sources-of-chronic-stress', 'ensuring-future-vitality', 'isc1-module1']
    self.session_features()
    self.block_cluster()
    self.user_block_last_info()
    
  def session_features(self):
    """
    Extract numerical features for each session
    Args:
        df_block_started (pd.DataFrame): A dataframe of block started event.
        df_block_finished (pd.DataFrame): A dataframe of block finished event.
    Returns:
        pd.Dataframe: features for ech session
    Raises: None
    """
    start = self.block_start.groupby(["UserId", "BlockCode", "AppSessionId"]).agg({"TimestampUtc":"min", "EventId":"max"}).reset_index()
    start.columns = ["UserId", "BlockCode", "AppSessionId", "time_start", "id_start"]
    finish = self.block_finish.groupby(["UserId", "BlockCode", "AppSessionId"]).agg({"TimestampUtc":"max", "CompletionState":"max"}).reset_index()
    finish.columns =["UserId", "BlockCode", "AppSessionId", "time_finish", "CompletionState"]
    self.session = start.merge(finish, on=["UserId", "BlockCode", "AppSessionId"],how="inner")
    self.session["duration"] = self.session.apply(lambda x: (x["time_finish"] - x["time_start"]).seconds, axis=1)
    self.session.reset_index(drop=True)
    return self.session;
  
  def block_cluster(self, clique_size=10, epsilon_dist=1.e-5):
    """
    Makes cliques of blocks -- for each block, find the closest clique_size blocks
    Args:
        df_block_features (pd.DataFrame): A dataframe of block features.
        clique_size (int, optional): The size of clique to calculate for each block
        epsilon_dist(float, optional): A small positive number to to each distance so that distance is always positive
    Returns:
        pd.Dataframe: Cliques and distances for each block
    Raises: None
    """
    features = self.block_ft.select_dtypes("number")
    scaled_features = StandardScaler().fit_transform(features)
    distances = nan_euclidean_distances(scaled_features, scaled_features)
    clique_indexes = np.argpartition(distances, clique_size+1)
    self.block_clique = pd.DataFrame(columns=["BlockCode", "clique_blocks", "clique_distances"])
    for i in range(len(self.block_ft)):
      block_id = self.block_ft["BlockCode"][i]
      clique_block_id_list = []
      clique_block_dist_list = []
      for j in range(clique_size+1):
        if clique_indexes[i][j] == i: continue;
        c_id = self.block_ft["BlockCode"][clique_indexes[i][j]]
        clique_block_id_list.append(c_id)
        c_dist = distances[i][clique_indexes[i][j]] + epsilon_dist
        clique_block_dist_list.append(c_dist)
      self.block_clique.loc[len(self.block_clique)] = [block_id, clique_block_id_list, clique_block_dist_list]
    return self.block_clique;
  
  # lastest state of each (user-block)
  def user_block_last_info(self):
    self.latest_info = self.session.groupby(["UserId", "BlockCode"])["time_finish"].max().reset_index(name="time_finish")
    self.latest_info = self.latest_info.merge(self.session, on=["UserId", "BlockCode", "time_finish"], how="inner")
    self.latest_info = self.latest_info[["UserId", "BlockCode", "id_start", "time_finish", "CompletionState"]].reset_index(drop=True)
    return self.latest_info
  
  # take out the latest state of each pair (user-block) to train the model
  def remove_latest_info(self):
    df_latest = self.latest_info.copy()
    df_latest["user_block"] = df_latest["UserId"] + " " + df_latest["BlockCode"]
    df_all = self.session.groupby(["UserId", "BlockCode"])["CompletionState"].count().reset_index(name="total_times_played")
    df_all["user_block"] = df_all["UserId"] + " " + df_all["BlockCode"]
    one_time_list = df_all[df_all["total_times_played"]==1]["user_block"]
    df_latest = df_latest[~df_latest["user_block"].isin(one_time_list)].reset_index(drop=True)
    for i in range(len(df_latest)):
      id_start = df_latest.iloc[i]["id_start"]
      ret_df = self.session.drop(self.session.index[(self.session["id_start"] == id_start)])
    return ret_df;
  
  # take out every possible (user-block) pair for score estimation
  def combine_not_trigger_block(self, df_user_block):
    users, blocks = self.user_ft["UserId"].unique(), self.block_ft["BlockCode"].unique()
    new_users, new_blocks = [], []
    user_block = [i+" "+j for j in blocks for i in users]
    user_block = [i for i in user_block if i not in list(df_user_block["user_and_block"])]
    if len(user_block)==0:
      return df_user_block
    else:
      for i in user_block:
        new_users.append(i.split(" ")[0])
        new_blocks.append(i.split(" ")[1])
      df = pd.DataFrame(columns = df_user_block.columns)
      for c in df.columns:
        if (c=="UserId"): df[c] = new_users
        elif (c=="BlockCode"): df[c] = new_blocks
        elif (c=="user_and_block"): df[c] = user_block
        else: df[c] = 0
      frame = [df_user_block, df]
      df = pd.concat(frame).reset_index(drop=True)
    return df;
  
  # extract neighboring features for user-block
  def total_avg_clique(self, x, features, df_user_block):
    total, avg = [], []
    for feature in features:
      blocklist = np.append(x.clique_blocks, x.BlockCode)
      filter = df_user_block[(df_user_block["UserId"] == x.UserId) & (df_user_block["BlockCode"].isin(blocklist))][["BlockCode", feature]]
      total_feature = filter[feature].sum()
      total.append(total_feature)
      
      filter.reset_index(drop=True, inplace=True)
      distances = np.append((np.array(x.clique_distances)/(np.array(x.clique_distances).min()*0.99)), 1)
      avg_feature, count = 0, 0
      for i in range(len(filter)):
        block = filter["BlockCode"].iloc[i]
        index = list(blocklist).index(block)
        temp = filter.loc[filter.BlockCode == block][feature].iloc[0]
        if temp != 0:
          avg_feature += temp/(distances[index]**2)
          count += 1
      if avg_feature != 0: avg_feature /= count
      avg.append(avg_feature)
    result = np.append(total, avg)
    return result
  
  def clique_features(self, df_user_block, df):   
    features = ["block_played", "block_completed", "question_played", 
              "question_completed", "video_played", "video_completed", "total_time_spent",
               "total_time_watched", "mean_time_watched"]
    results = list(df_user_block.apply(self.total_avg_clique, args=[features, df], axis = 1))
    total_features = ['clique_'+i for i in features]
    avg_features = ['clique_avg_'+i for i in features]
    features = np.append(total_features, avg_features)
    df_user_block[features] = results
    return df_user_block

  # additional features: user oriented and block oriented
  def add_user_block_oriented_ft(self, df_user_block):
    user, block = self.user_ft.copy(), self.block_ft.copy()
    for c in user.columns:
      if user[c].isna().sum() > len(user)/2: user.drop(columns=[c], inplace=True)
    cols = ["UserId"] + ["user_ft_"+i for i in user.columns if i != "UserId"]
    user.columns = cols
    df_user_block = df_user_block.merge(user, on="UserId", how="left")
    
    block.drop(columns=["blocks_unfinished"], inplace= True)
    block.columns = ["BlockCode", "block_ft_block_played", "block_ft_block_completed", "block_ft_block_played_mean", "block_ft_no_user_played", \
          "block_ft_no_user_played_rate", "block_ft_block_duration_mean", "block_ft_block_duration_std", "block_ft_no_block_before"]
    for c in block.columns:
      if block[c].isna().sum() > len(block)/2: block.drop(columns=[c], inplace=True)
    df_user_block = df_user_block.merge(block, on="BlockCode", how="left")
    return df_user_block
  
  # parallel processing
  def parallelize_dataframe(self, args, func, n_cores=5):
    pool = multiprocessing.Pool(n_cores)
    #df = pd.concat(pool.starmap(func, args))
    res = pool.starmap_async(func, args)
    df = pd.concat(res.get(None), ignore_index=True).reset_index(drop=True)
    pool.close()
    pool.join()
    return df

  # extract all features
  def extract_all_features(self, latest = True):
    vid, ques, df_session = pd.DataFrame(), pd.DataFrame(), pd.DataFrame()
    if latest == False:
      df_session = self.remove_latest_info()
      vid = self.video[self.video["AppSessionId"].isin(self.session.AppSessionId.unique())]
      ques = self.question[self.question["AppSessionId"].isin(self.session.AppSessionId.unique())]
    else: 
      vid, ques, df_session = self.video, self.question, self.session
    vid["Finished"] = 1 - vid["Skipped"]
    vid["VideoLengthPlayed"] = vid["VideoLengthPlayed"].astype(float)
    
    v = vid.groupby(["UserId", "BlockCode"]).agg({"Finished":["count","sum"], "VideoLengthPlayed":["sum", "mean"]}).reset_index()
    v.columns = ["UserId", "BlockCode", "video_played", "video_completed", "total_time_watched", "mean_time_watched"]
    q = ques.groupby(["UserId", "BlockCode"]).agg({"CompletionState":["count","sum"]}).reset_index()
    q.columns = ["UserId", "BlockCode", "question_played", "question_completed"]
    
    user_block = df_session.groupby(["UserId", "BlockCode"]).agg({"CompletionState":["count", "sum"], "duration":["sum", "mean"]}).reset_index()
    user_block.columns = ["UserId", "BlockCode", "block_played", "block_completed", "total_time_spent", "avg_time_spent",]
    
    user_block = user_block.merge(v, on = ["UserId", "BlockCode"], how = "left")
    user_block = user_block.merge(q, on = ["UserId", "BlockCode"], how = "left")
    user_block["block_completed_rate"] = user_block["block_completed"]/user_block["block_played"]
    user_block["question_completed_rate"] = user_block["question_completed"]/user_block["question_played"]
    user_block["video_completed_rate"] = user_block["video_completed"]/user_block["video_played"]
    user_block["user_and_block"] = user_block["UserId"] +" "+user_block["BlockCode"]
    
    if latest == True:
      user_block = self.combine_not_trigger_block(user_block)
    user_block = self.add_user_block_oriented_ft(user_block)
    user_block = user_block.merge(self.block_clique, on="BlockCode",how="left")
    
    user_block_ = user_block.copy()
    if latest == True:
      user_block = user_block[user_block.BlockCode.isin(self.introduction_blocks)].reset_index(drop=True)
      
    args = [(i, user_block_) for i in np.array_split(user_block, 1000)]
    user_block = self.parallelize_dataframe(args = args, func=self.clique_features)

    if latest == False:
      df_latest = self.latest_info[["UserId", "BlockCode", "CompletionState"]]
      user_block = user_block.merge(df_latest, on=["UserId", "BlockCode"], how="inner")
      user_block = user_block.rename(columns={"CompletionState":"next_attempt"})
      user_block.to_csv("/dbfs/Truc/train.csv")
    else:
      user_block.to_csv("/dbfs/Truc/predict.csv")
    return user_block
    #else:
      #return user_block
  
  def run(self):
    self.training_data = self.extract_all_features(latest = False)
    self.predicting_data = self.extract_all_features(latest = True)
    return self.training_data, self.predicting_data
  
  

## Predictor: All methods to train and estimate probability score

In [0]:
class Predictor:
  def __init__(self, train_data, predict_data):
    self.train_data = train_data
    self.predict_data = self.normalize_data(data = predict_data)
    self.train_model()
    self.block2topic()
  
  def block2topic(self):
    df = pd.read_csv("/dbfs/Truc/introduction_blocks_topics.csv")
    self.block_topic = dict(list(zip(df["introduction_blockcode"], df["topic"])))
    return self.block_topic
  
  # data normalization
  def normalize_data(self, data):
    columns = list(data.select_dtypes("number").columns)
    # check if this is training data or predicting data
    if "next_attempt" in columns: 
      columns.remove("next_attempt")
      check_variables = ["block_played", "question_played", "video_played", "avg_time_spent", "user_ft_avg_app_session_duration", "mean_time_watched"]
      print("before filter: ", len(data))
      data[(data["block_played"] <= 200) 
            & ((data["question_played"] <= 1000) | (data["question_played"].isna()))
            & ((data["video_played"] <= 200) | (data["video_played"].isna()))
            & (data["avg_time_spent"] <= 5*3600) 
            & (data["user_ft_avg_app_session_duration"] <= 10*3600) 
            & ((data["mean_time_watched"] <= 3600) | (data["mean_time_watched"].isna()))
            ]
      data = data.reset_index(drop=True)
      print("after filter: ", len(data))
    for c in columns:
      if data[c].isna().sum() > len(data)/2:
        data.drop(columns=[c], inplace=True)
      else:
        mean = data[c].mean()
        data[c].fillna(mean, inplace=True)
        std = data[c].std()
        if std != 0:
          data[c] = (data[c]-data[c].mean())/std
        else:
          data.drop(columns=[c], inplace=True)
    return data.reset_index(drop=True)
  
  # split data for training
  def train_test_split(self, data, validation = True):
    n_samples = data.shape[0]
    count = data.next_attempt.value_counts()
    if count[0]/count[1] < 0.8 or count[1]/count[0] <0.8:
      smote = SMOTE()
      X = data.drop(columns=["next_attempt"])
      y = data["next_attempt"]
      X, y = smote.fit_resample(X, y)
      data = X
      data["next_attempt"] = y
    if validation:
      sample = data.sample(n=n_samples)
      msk = np.random.rand(len(sample)) < 0.8
      non_test = sample[msk]
      test = sample[~msk]
      msk = np.random.rand(len(non_test)) < 0.7
      train = non_test[msk]
      validation = non_test[~msk]
      return train, validation, test
    else:
      sample = data.sample(n=n_samples)
      msk = np.random.rand(len(sample)) < 0.8
      train = sample[msk]
      test = sample[~msk]
      return train, test
  
  #train Logistic Regression model and run feature selection
  def train_model(self):
    data = self.train_data.copy()
    data = self.normalize_data(data).select_dtypes("number")
    train, test = self.train_test_split(data, validation = False)
    y_train, y_test = train["next_attempt"].values, test["next_attempt"].values
    self.model = LogisticRegression(fit_intercept=True, max_iter=200)
    
    all_features = train.drop(columns=["next_attempt"]).columns
    features = [([], 0)]
    R_sq_fwd = []
    
    # forward stepwise feature selection
    for k in range(1, len(all_features)):
      best_k_minus_1 = features[-1][0]
      new_features = list(set(all_features) - set(best_k_minus_1))
      validation_R_sqs = []
      for feature in new_features:
        k_features = best_k_minus_1 + [feature]
        X_train, X_test = train[k_features].values, test[k_features].values
        if k==1:
          X_train = X_train.reshape((len(X_train), 1))
        self.model.fit(X_train, y_train)
        validation_R_sqs.append(mean_squared_error(self.model.predict_proba(X_test)[:, 1], y_test))
        
      best_k = best_k_minus_1 + [new_features[np.argmin(validation_R_sqs)]]
      R_sq_fwd.append(np.min(validation_R_sqs))
      features.append((best_k, np.min(validation_R_sqs)))
        
    X_train, X_test = train[all_features].values, test[all_features].values
    self.model.fit(X_train, y_train)
    features.append((all_features, mean_squared_error(self.model.predict_proba(X_test)[:, 1], y_test)))
    best_feature_set = sorted(features, key=lambda t: t[1])[1]
    #train on the best feature set
    self.model = LogisticRegression(fit_intercept=True).fit(data[best_feature_set[0]], data["next_attempt"])
    self.best_feature_set = best_feature_set[0]
    return self.model
  
  # estimate probability score for every (user-block)
  def predict(self):
    scores = self.model.predict_proba(self.predict_data[self.best_feature_set])[:, 1]
    #scores = self.model.predict(self.predict_data[self.best_feature_set])
    self.block_scoring = self.predict_data[["UserId", "BlockCode"]]
    self.block_scoring["next_time_rate"] = scores
    self.block_scoring["topic"] = self.block_scoring["BlockCode"].apply(lambda x: self.block_topic[x])
    self.block_scoring.to_csv("/dbfs/Truc/score.csv")
    return self.block_scoring
  
  def get_reccomend_topic(self, userid):
    df = self.block_scoring[self.block_scoring["UserId"]==userid]
    score = zip(df["BlockCode"], df["next_time_rate"])
    score = sort(score, key=lambda x: x[1], reverse=True)
    return score
  
  def testing_method(self):
    scores = self.model.predict(self.predict_data[self.best_feature_set])
    block_scoring = self.predict_data[["UserId", "BlockCode"]]
    block_scoring["next_time_rate"] = scores
    return block_scoring

## Generator: Generate score table. This table is saved in the database

In [0]:
# generate scoring table for all users. This table is re-generated every two day (re-train model every two days)
def generate_scoring_table(block_started, block_finished, video_played, question_played, user_features, block_features):
  data_class = Data(user_ft = user_features, 
                  block_ft = block_features, 
                  block_start = block_started, 
                  block_finish = block_finished, 
                  question = question_played, 
                  video = video_played)
  training_data = data_class.extract_all_features(latest = False)
  predicting_data = data_class.extract_all_features(latest = True)
  model = Predictor(train_data = training_data, predict_data = predicting_data)
  ret_df = model.predict()
  return ret_df;

SCORING_TABLE = {
    "outputSpecs": {
        "tableName": "scoring_table",
        "indexColumns": ["UserId", "topic"],
        "ifExists": "replace"
    },
    "inputSpecs": [
        {
            "alias": "block_started",
            "type": "delta_table_to_pandas",
            "queryOrTable": "norm_events_block_started",
        },
        {
            "alias": "block_finished",
            "type": "delta_table_to_pandas",
            "queryOrTable": "norm_events_block_finished",
        },
      {
            "alias": "video_played",
            "type": "delta_table_to_pandas",
            "queryOrTable": "norm_events_video_played",
        },
      {
            "alias": "question_played",
            "type": "delta_table_to_pandas",
            "queryOrTable": "norm_events_question_played",
        },
        {
            "alias": "user_features",
            "type": "delta_table_to_pandas",
            "queryOrTable": "analyzed_features_per_user",
        },
        {
            "alias": "block_features",
            "type": "delta_table_to_pandas",
            "queryOrTable": "analyzed_features_per_block",
        },
    ],
    "featureSpecs": [
        {
            "name": "generate_scoring_table",
            "function": generate_scoring_table,
            "functionInputs": ["block_started", "block_finished", "video_played", "question_played", "user_features", "block_features"],
            "isTimeWindowFunction": False
        }
    ]
}
process = FeaturesTableGenerator(SCORING_TABLE)
process.run()