In [1]:
# import libraries
import os
import re
import copy
import time
import datetime
import numpy as np
import pandas as pd
import seaborn as sns
from matplotlib import pyplot as plt

from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.functions import *
from pyspark.sql.types import *

from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator
from pyspark.sql.types import IntegerType, ArrayType, FloatType, DoubleType, Row, DateType
from pyspark.ml.linalg import DenseVector, SparseVector
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import  MulticlassClassificationEvaluator
from pyspark.ml.feature import CountVectorizer, IDF, Normalizer, PCA, RegexTokenizer, StandardScaler, StopWordsRemover, StringIndexer, VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


%matplotlib inline

In [2]:
# # This cell is used on AWS EMR

# import os
# import re
# import sys
# print("start SPARK application")
# os.environ["SPARK_HOME"] = "/usr/lib/spark/"
# os.environ["PYSPARK_PYTHON"] = "/mnt/anaconda3/bin/python3.7"
# spark_home = os.environ.get('SPARK_HOME', None)
# sys.path.insert(0, spark_home + "/python")
# sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-src.zip'))

In [3]:
# # This cell is used on AWS EMR
# from pyspark.sql import SparkSession
# spark = SparkSession.builder.master("yarn") \
#     .config("spark.dynamicAllocation.enabled", "true") \
#     .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
#     .config("spark.shuffle.spill.compress", "true") \
#     .config("spark.shuffle.service.enabled", "true") \
#     .config("spark.io.compression.codec", "snappy") \
#     .config("spark.driver.memory", "24g") \
#     .config("spark.driver.cores", "4") \
#     .config("spark.executor.cores", "4") \
#     .config("spark.executor.memory", "24g") \
#     .config("spark.kryoserializer.buffer.max", "2000m") \
#     .config("spark.network.timeout", "360000") \
#     .config("spark.dynamicAllocation.minExecutors", "25")\
#     .enableHiveSupport() \
#     .getOrCreate()
# spark.conf.set("spark.sql.broadcastTimeout", 72000)

In [4]:
# df = spark.read.json('s3://<bucket-name>/ElifSurmeli/' + event_data)

In [5]:

# pd.DataFrame(df.take(5), columns=df.columns).head()

In [6]:
pd.set_option('display.max_columns', None)  
pd.set_option('display.expand_frame_repr', False)
pd.set_option('max_colwidth', -1)

  pd.set_option('max_colwidth', -1)


In [7]:
# create a Spark session
spark = SparkSession.builder.master('local[*]').appName("Sparkify").getOrCreate()
spark

In [8]:
path = './mini_sparkify_event_data.json'

df = spark.read.json(path)
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [9]:
def cleaning_data(df):
    """
    Casts miliseconds ts column and creates two new columns with timestamptype and datetype,
    which will be used for further processing
    Extracts days from ts
    Casts user id to LongType
    Splits location field and takes only state name as location.
        -   parameters: df (spark DataFrame)
        -   returns: df preprocessed Spark DataFrame
    """
    # Changing userId column to Longtype and removing null from userId
    df = df.withColumn('userId', f.col('userId').cast(LongType()))
    df = df.filter(f.col('userId').isNotNull())
    
    # Removing LoggedOut from column auth.
    df = df.filter(f.col('auth')!='LoggedOut')

    # taking only the state of location
    df = df.withColumn('location', f.split(f.col('location'),',').getItem(1))

    # Convert the timestamps in columns 'registration' and 'ts' into datetime format and add them to the dataframe
    df = df.withColumn('registrationDate', (df.registration / 1000).cast(TimestampType()))
    df = df.withColumn('tsDate', (df.ts / 1000).cast(TimestampType()))

    return df

In [10]:
def churns_data(df):
    """
    Creating label column based on churn of user.
    -   parameters: df (spark DataFrame)
    -   returns: df with calculated column
    """
    # defining Churn data
    label_df = df.withColumn('label', when((col('page').isin(['Cancellation Confirmation','Cancel'])) | (col('auth')=='Cancelled'),1 ).otherwise(0))\
        .groupBy('userId').agg(sum('label').alias('label')).withColumn('label', when(col('label')>=1 ,1).otherwise(0))

    df = df.join(label_df, on='userId')
   
    return df

In [11]:
def days_on_platform(df):
    """
    Calculate the time that user in registered on the platform based on registration date and last interaction
    -   parameters: df (spark DataFrame)
    -   returns: df with who many days the user has been registered on the platform.
    """
    
    #crating a column of last interaction per user
    last_interaction_df =  df.groupBy('userId').agg(max('ts').alias('lastIteraction'))
    df = last_interaction_df.join(df, on= 'userId', how='left').withColumn('registeredDays', ((f.col('lastIteraction')-f.col('registration'))/86400000).cast(IntegerType()))

    return df

In [12]:
def last_level(df):
    """
    Search the latest level of each user is was paid or free tier.
    -   parameters: df (spark DataFrame)
    -   returns: df with latest level assigned to each user
    """
    level_df = df.orderBy('ts', ascending=False).groupBy('userId').agg(first('level').alias('last_level'))
    
    df = df.drop('level')
    df = df.join(level_df, on='userId')
    
    return df

In [13]:
def mean_length(df):
    """
    Finds the averages for each length per users by day.
    -  parameters: df (spark DataFrame)
    -  returns: df with average length column DataFrame
    """
    avg_length_df = df.groupBy('userId').avg('length').withColumnRenamed('avg(length)', 'length')

    df = df.drop('length')
    df = df.join(avg_length_df, on='userId')

    return df

In [14]:
def agg_sessionid(df):
    """
    Calculates daily average of distinct sessionId for each user
    -  parameters: df (spark DataFrame)
    -  returns: daily and monthly aggregates DataFrame
    """
    daily_session_df = df.groupby('userId','tsDate').agg(countDistinct('sessionId')).\
        groupBy('userId').avg('count(sessionId)').\
        withColumnRenamed('avg(count(sessionId))', 'avg_daily_sessions')

    return daily_session_df

In [15]:
def mean_items(df):
    """
    Finds the averages for each item per users by day. 
    -  parameters: df (spark DataFrame)
    -  returns: df with daily averages DataFrame
    """
    items_per_day_df = df.groupby('userId','tsDate').agg(max('itemInSession')).\
    groupBy('userId').avg('max(itemInSession)').\
    withColumnRenamed('avg(max(itemInSession))', 'avg_daily_items')

    return items_per_day_df

In [16]:
def duration_session(df):
    """
    Finds the averages for each item per users by day.
    -  parameters: df (spark DataFrame)
    -  returns: df with daily duration session DataFrame
    """
    #Calculates daily average of distinct sessionId for each user
    duration_session_per_day_df = df.groupby('userId','tsDate','sessionId').agg(max('ts').alias('session_end'), min('ts').alias('session_start')).withColumn('session_duration_sec', (col('session_end')-col('session_start'))*0.001).\
        groupby('userId','tsDate').avg('session_duration_sec').groupby('userId').agg(mean('avg(session_duration_sec)').alias('avg_daily_session_duration')).orderBy('userId', ascending=False)
    
    return duration_session_per_day_df 


In [17]:
def pages_aggregation(df):
    """
    Finds the averages for each page event per users by day 
    -  parameters: df (spark DataFrame)
    -  returns: df with daily duration session DataFrame
    """
    unique_pages = [row.page for row in df.select('page').distinct().collect()]
    unique_pages.remove('Cancel')
    unique_pages.remove('Cancellation Confirmation')
        
    page_event_per_day_df = df.groupby('userId','tsDate').pivot('page').count()
    exp_dict={}
    for page in unique_pages:
        exp_dict.update({page:'mean'})

    page_event_per_day_df = page_event_per_day_df.join(page_event_per_day_df.groupBy('userId').agg(exp_dict).fillna(0), on='userId')

    for page in unique_pages:
        page_event_per_day_df = page_event_per_day_df.drop(page)  
        page_event_per_day_df = page_event_per_day_df.withColumnRenamed('avg({})'.format(page), 'avg_daily_{}'.format(page))

    page_event_per_day_df = page_event_per_day_df.drop('Cancel','Cancellation Confirmation','tsDate').drop_duplicates()

    return page_event_per_day_df

In [18]:
def join_features(df, session_per_user_df, duration_session_per_day_df, items_per_day_df, page_event_per_day_df):
    """
    #Joins all feature engineering dataframes and main dataframe for couple of original columns
    #artist, song, method, status, userAgent have been removed to avoid complexity
    #firstName, lastName has been removed as they're redundant.user id sufficient to identify customer
    -   parameters:  main DF, df_seesion_per_user, df_duration_per_day, df_items per_day, df_page_event_per_day
    -   returns: df with all features ready to be processed
    """
    
    
    all_aggs_df =\
        session_per_user_df\
        .join(duration_session_per_day_df, on= 'userId')\
        .join(items_per_day_df, on='userId')\
        .join(page_event_per_day_df, on='userId')

    df = df.drop('auth', 'level','userAgent','tsDate','interaction_time','registration', 'ts','song','page','itemInSession','sessionId','artist','firstName','lastName','method','status')
    joined_df = all_aggs_df.join(df, on='userId')
        
    joined_df = joined_df.drop_duplicates()
    df_features = joined_df.drop('userId', 'tsDate', 'registrationDate')

    return df_features

In [19]:
def build_pipeline(num_cols):
    """
    Process all categorical and text columns with string indexer. 
    Process all numerical columns with Vector assembler.
    Creates a pipeline with indexer and assembler to process all columns at once.
    -   parameters: list of numerical column names
    -   returns: data processed by the pipeline
    """
    
    indexer_gender = StringIndexer(inputCol='gender', outputCol='gender_index')
    indexer_location = StringIndexer(inputCol='location', outputCol='location_index')
    indexer_last_level = StringIndexer(inputCol='last_level', outputCol='last_level_index')

    assembler = VectorAssembler(inputCols=num_cols, outputCol='features')

    process_pipeline = Pipeline(stages=[indexer_gender, indexer_location, indexer_last_level, assembler])

    return process_pipeline

In [20]:
def post_process_data(df_features):
    """
    Transforms features with feature pipeline
    :param features_df: Spark DataFrame of features
    :return Spark Dataframe with label and assembled features
    """
    num_cols = []
    
    for field in df_features.schema.fields :
        if field.dataType!=StringType():
            num_cols.append(field.name)

    num_cols.remove('label')

    process_pipeline = build_pipeline(num_cols)
    model_df = process_pipeline.fit(df_features).transform(df_features)
    
    return model_df

In [21]:
def fit_predict(train, test, model):
    """
    This function fits training and runs predictions with given training and test datasets and displays results for given model
    -   parameters train: Spark dataframe with training data
    -   parameters test:  Spark data frame with testing data
    -   parameters model : model name as string, either 'logistic_regression', 'random_forest' or 'gradient_boosting'
    -   returns: A classification model trained, and the results. 
    """

    if model == 'logistic_regression':
        ml = LogisticRegression()
    elif model == 'random_forest':
        ml = RandomForestClassifier()
    elif model == 'gradient_boosting':
        ml = GBTClassifier()
    else:
        return "Please choose an appropriate model"
    
    # Fit and calculate predictions
    clf = ml.fit(train)
    results = clf.transform(test)

    print(compare_models({model : results}))

    return clf, results

In [22]:
def compare_models(list_prediction):
  """
  This will compare the results of each model based in: confusion matrix, accuracy, precision, recall, f1-score and AUC
  -  parameters: list of results of each model
  -  returns: Confusion Matrix, Accuracy, Precision, Recall, F1 and AUC.
  """
  # ‘s’ would be the returned string will collect and build the confusion matrix and also the metrics (accuracy, precision, recall and F1-score)
  
  s = '\n'

  for model, df_transform_model in list_prediction.items():

    s += '=' * 50 + '\n' 
    s += model + ':\n'
    s += '-' * 50 + '\n'
    s += ' \n'

    # Creating the Confusion Matrix: tp = true positive, tn = true negative, fp = false positive, fn = false negative
    tp = df_transform_model.select('label', 'prediction').where((f.col('label') == 1) & (f.col('prediction') == 1)).count()
    tn = df_transform_model.select('label', 'prediction').where((f.col('label') == 0) & (f.col('prediction') == 0)).count()
    fp = df_transform_model.select('label', 'prediction').where((f.col('label') == 0) & (f.col('prediction') == 1)).count()
    fn = df_transform_model.select('label', 'prediction').where((f.col('label') == 1) & (f.col('prediction') == 0)).count()

    # Ploting the Confusion Matrix.  
    s += ' '*25 + 'Predict:\n'
    s += ' '*18 +  'Churn:' + ' '*5 + 'Not-Churn:\n'
    s += ' '*8 + 'Churn:' + ' '*7 +  str(int(tp)) + ' '*11 + str(int(fn)) + '\n'
    s += 'Real:\n'
    s += ' '*8 + 'Not-Churn:' + ' '*3 + str(int(fp)) +  ' '*11 + str(int(tn))  + '\n'
    s += '\n'

    # Evaluation metric using: MulticlassClassificationEvaluator and BinaryClassificationEvaluator 
    evaluator = MulticlassClassificationEvaluator()
    auc_evaluator = BinaryClassificationEvaluator()
    f1_score_evaluator = MulticlassClassificationEvaluator(metricName='f1')
    f1_score = f1_score_evaluator.evaluate(df_transform_model.select(col('label'), col('prediction')))

    s += f'Accuracy: {evaluator.evaluate(df_transform_model, {evaluator.metricName: "accuracy"})*100:.2f}%\n'
    s += f'Precision: {evaluator.evaluate(df_transform_model, {evaluator.metricName: "precisionByLabel", evaluator.metricLabel: 1})*100:.2f}%\n'
    s += f'Recall: {evaluator.evaluate(df_transform_model, {evaluator.metricName: "recallByLabel", evaluator.metricLabel: 1})*100:.2f}%\n'
    s += f'F1: {f1_score:.2%}%\n'
    s += f'AUC: {auc_evaluator.evaluate(df_transform_model, {auc_evaluator.metricName: "areaUnderROC"})*100:.2f}%\n' 


  return s

In [23]:
def evaluation_model(results):
    """
    This function evaluate the model trained with 2 metrics f1 and AUC
    -   parameters results: Spark DataFrame, model prediction outputs
    -   returns: Print the metrics f1 and AUC (areaUnderROC).
    """

    f1_score_evaluator = MulticlassClassificationEvaluator(metricName='f1')
    f1_score = f1_score_evaluator.evaluate(results.select(col('label'), col('prediction')))
    print('The F1 score on the test set is {:.2%}'.format(f1_score)) 
    print('')
    auc_evaluator = BinaryClassificationEvaluator()
    metric_value = auc_evaluator.evaluate(results, {auc_evaluator.metricName: "areaUnderROC"})
    print('The areaUnderROC on the test set is {:.2%}'.format(metric_value))

In [24]:
def tuned_randomforest(train, test, numTrees=[75, 100], maxDepth=[10,20]):
    """
    This function tunes the random forest, searching for the best parameters for the model using gridsearch to find numTrees and maxDepth, evaluates the results and prints best parameters
    -   parameter train: training data
    -   parameter test: test data
    -   parameter numTrees: List of integers,Number of trees in the random forest
    -   param maxDepth: List of integers, Maximum depth of the tree
    -   returns: Prints the best features based on importance.
    """
    clf = RandomForestClassifier()
    
    paramGrid = ParamGridBuilder() \
        .addGrid(clf.numTrees, numTrees) \
        .addGrid(clf.maxDepth, maxDepth) \
        .build()   
    
    crossval = CrossValidator(estimator = Pipeline(stages=[clf]),
                         estimatorParamMaps = paramGrid,
                         evaluator = MulticlassClassificationEvaluator(metricName='f1'),
                         numFolds = 3)

    cvModel = crossval.fit(train)
    predictions = cvModel.transform(test)
    
    evaluation_model(predictions)
    
    bestPipeline = cvModel.bestModel

    print('======FEATURES=========IMPORTANCE=========')
    # prints feature importances
    for i in range(len(bestPipeline.stages[0].featureImportances)):
        print(f"{features_df.columns[i]} : {bestPipeline.stages[0].featureImportances[i]} \n")
                                                                                                                                                                                                                                                                                                                                                                                                                                                           

In [25]:
df = cleaning_data(df)                 

In [26]:
df = churns_data(df)
df = days_on_platform(df)
df = last_level(df)
df = mean_length(df)
session_per_user_df = agg_sessionid(df)
items_per_day_df = mean_items(df)
duration_session_per_day_df = duration_session(df)
page_event_per_day_df = pages_aggregation(df)

features_df = join_features(df, session_per_user_df, items_per_day_df, duration_session_per_day_df, page_event_per_day_df)


In [27]:
features_df.show(2)

+------------------+------------------+--------------------------+------------------+--------------------+---------------------+-------------------+------------------------+---------------------+------------------+---------------+---------------+-----------------+-------------------------+--------------+-------------------+----------------+--------------------------+-----------------------+--------------+--------------+------+--------+-----+--------------+----------+------------------+
|avg_daily_sessions|   avg_daily_items|avg_daily_session_duration|avg_daily_Settings|avg_daily_Add Friend|avg_daily_Thumbs Down|avg_daily_Downgrade|avg_daily_Submit Upgrade|avg_daily_Roll Advert|avg_daily_NextSong|avg_daily_Error|avg_daily_About|avg_daily_Upgrade|avg_daily_Add to Playlist|avg_daily_Home|avg_daily_Thumbs Up|avg_daily_Logout|avg_daily_Submit Downgrade|avg_daily_Save Settings|avg_daily_Help|lastIteraction|gender|location|label|registeredDays|last_level|            length|
+-----------------

In [28]:
pd.DataFrame(features_df.take(5), columns=features_df.columns).head()

Unnamed: 0,avg_daily_sessions,avg_daily_items,avg_daily_session_duration,avg_daily_Settings,avg_daily_Add Friend,avg_daily_Thumbs Down,avg_daily_Downgrade,avg_daily_Submit Upgrade,avg_daily_Roll Advert,avg_daily_NextSong,avg_daily_Error,avg_daily_About,avg_daily_Upgrade,avg_daily_Add to Playlist,avg_daily_Home,avg_daily_Thumbs Up,avg_daily_Logout,avg_daily_Submit Downgrade,avg_daily_Save Settings,avg_daily_Help,lastIteraction,gender,location,label,registeredDays,last_level,length
0,1.0,27.825871,0.0,1.0,1.0,1.0,0.0,0.0,1.0,1.0,1.0,0.0,1.0,1.0,1.0,1.0,1.0,0.0,1.0,1.0,1542955611000,M,WA,0,72,free,253.560581
1,1.0,137.758755,0.0,1.0,1.0,1.0,1.0,0.0,0.0,1.0,0.0,1.0,0.0,1.0,1.0,1.0,1.0,0.0,0.0,0.0,1539159711000,F,OH,0,21,paid,252.226546
2,1.0,15.125,0.0,0.0,1.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,1542993707000,F,OH-KY-IN,0,61,free,264.422171
3,1.0,184.296004,0.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,0.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1543415437000,F,FL,0,82,paid,249.587069
4,1.0,30.305638,0.0,1.0,1.0,1.0,0.0,1.0,1.0,1.0,0.0,0.0,1.0,1.0,1.0,1.0,1.0,0.0,0.0,1.0,1543515959000,M,CA,0,77,paid,255.828061


In [29]:
#features_df.select([count(when(isnull(c), c)).alias(c) for c in features_df.columns]).show()

In [30]:
features_df.printSchema()

root
 |-- avg_daily_sessions: double (nullable = true)
 |-- avg_daily_items: double (nullable = true)
 |-- avg_daily_session_duration: double (nullable = true)
 |-- avg_daily_Settings: double (nullable = false)
 |-- avg_daily_Add Friend: double (nullable = false)
 |-- avg_daily_Thumbs Down: double (nullable = false)
 |-- avg_daily_Downgrade: double (nullable = false)
 |-- avg_daily_Submit Upgrade: double (nullable = false)
 |-- avg_daily_Roll Advert: double (nullable = false)
 |-- avg_daily_NextSong: double (nullable = false)
 |-- avg_daily_Error: double (nullable = false)
 |-- avg_daily_About: double (nullable = false)
 |-- avg_daily_Upgrade: double (nullable = false)
 |-- avg_daily_Add to Playlist: double (nullable = false)
 |-- avg_daily_Home: double (nullable = false)
 |-- avg_daily_Thumbs Up: double (nullable = false)
 |-- avg_daily_Logout: double (nullable = false)
 |-- avg_daily_Submit Downgrade: double (nullable = false)
 |-- avg_daily_Save Settings: double (nullable = false)
 

In [31]:
#pd_features = features_df.toPandas()

In [32]:
#fig = plt.figure(figsize=(20,15))
#ax = fig.gca()
#h = pd_features.hist(ax=ax)

In [33]:
model_data = post_process_data(features_df)

In [34]:
# Split the data into train and test subsets
train, test = model_data.randomSplit([0.8, 0.2], seed=9)

In [37]:
# Fit various models and visualize their accuracies
for model in ['logistic_regression', 'random_forest', 'gradient_boosting']:
    fit_predict(train, test, model)


logistic_regression:
--------------------------------------------------
 
                         Predict:
                  Churn:     Not-Churn:
        Churn:       7           3
Real:
        Not-Churn:   3           40

Accuracy: 88.68%
Precision: 70.00%
Recall: 70.00%
F1: 88.68%%
AUC: 86.28%


random_forest:
--------------------------------------------------
 
                         Predict:
                  Churn:     Not-Churn:
        Churn:       7           3
Real:
        Not-Churn:   2           41

Accuracy: 90.57%
Precision: 77.78%
Recall: 70.00%
F1: 90.37%%
AUC: 94.19%


gradient_boosting:
--------------------------------------------------
 
                         Predict:
                  Churn:     Not-Churn:
        Churn:       8           2
Real:
        Not-Churn:   8           35

Accuracy: 81.13%
Precision: 50.00%
Recall: 80.00%
F1: 82.60%%
AUC: 90.00%



In [36]:
tuned_rf = tuned_randomforest(train, test)

The F1 score on the test set is 87.03%

The areaUnderROC on the test set is 93.26%
avg_daily_sessions : 0.0 

avg_daily_items : 0.08744437069036085 

avg_daily_session_duration : 0.0 

avg_daily_Settings : 0.016216111818598958 

avg_daily_Add Friend : 0.009976415593522693 

avg_daily_Thumbs Down : 0.014341155220435609 

avg_daily_Downgrade : 0.02593551061943221 

avg_daily_Submit Upgrade : 0.015201497735093291 

avg_daily_Roll Advert : 0.02609169877205655 

avg_daily_NextSong : 0.0 

avg_daily_Error : 0.02030323144682147 

avg_daily_About : 0.01830046980787026 

avg_daily_Upgrade : 0.013747358340235882 

avg_daily_Add to Playlist : 0.004889348394876002 

avg_daily_Home : 0.009852658058802892 

avg_daily_Thumbs Up : 0.003800499176882964 

avg_daily_Logout : 0.006566081731321905 

avg_daily_Submit Downgrade : 0.018245264360672777 

avg_daily_Save Settings : 0.023218183420334174 

avg_daily_Help : 0.011735578521987537 

lastIteraction : 0.4190742496348529 

gender : 0.1698534558205737 

l