# Sparkify Project Workspace
This workspace contains a tiny subset (128MB) of the full dataset available (12GB). Feel free to use this workspace to build your project, or to explore a smaller subset with Spark before deploying your cluster on the cloud. Instructions for setting up your Spark cluster is included in the last lesson of the Extracurricular Spark Course content.

You can follow the steps below to guide your data analysis and model building portion of this project.

In [22]:
# import libraries

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, avg, desc,countDistinct, count, when, concat, lit
from pyspark.sql.types import IntegerType, DateType

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

import datetime

import warnings
warnings.filterwarnings('ignore')
%matplotlib inline

from pyspark.ml.feature import CountVectorizer, IDF, Normalizer, PCA, RegexTokenizer, StandardScaler, StopWordsRemover, StringIndexer, VectorAssembler
from pyspark.sql import Window
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [23]:
# create a Spark session
spark = SparkSession.builder \
    .master("local") \
    .appName("Sparkify") \
    .getOrCreate()

# Load and Clean Dataset
In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. Load and clean the dataset, checking for invalid or missing data - for example, records without userids or sessionids. 

In [24]:
df = spark.read.json("mini_sparkify_event_data.json")
df.head()

Row(artist='Martha Tilston', auth='Logged In', firstName='Colin', gender='M', itemInSession=50, lastName='Freeman', length=277.89016, level='paid', location='Bakersfield, CA', method='PUT', page='NextSong', registration=1538173362000, sessionId=29, song='Rockpools', status=200, ts=1538352117000, userAgent='Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0', userId='30')

In [25]:
print((df.count(), len(df.columns)))

(286500, 18)


In [26]:
print("columns Present in data set in the dataframe{}".format(df.columns))

columns Present in data set in the dataframe['artist', 'auth', 'firstName', 'gender', 'itemInSession', 'lastName', 'length', 'level', 'location', 'method', 'page', 'registration', 'sessionId', 'song', 'status', 'ts', 'userAgent', 'userId']


In [27]:
df.describe().toPandas()

Unnamed: 0,summary,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,count,228108,286500,278154,278154,286500.0,278154,228108.0,286500,278154,286500,286500,278154.0,286500.0,228108,286500.0,286500.0,278154,286500.0
1,mean,551.0852017937219,,,,114.41421291448516,,249.1171819778458,,,,,1535358834084.4272,1041.526554973822,Infinity,210.05459685863875,1540956889810.4834,,59682.02278593872
2,stddev,1217.7693079161374,,,,129.76726201140994,,99.2351792105836,,,,,3291321616.327586,726.7762634630741,,31.50507848842214,1507543960.8226302,,109091.9499991047
3,min,!!!,Cancelled,Adelaida,F,0.0,Adams,0.78322,free,"Albany, OR",GET,About,1521380675000.0,1.0,ÃÂg ÃÂtti GrÃÂ¡a ÃÂsku,200.0,1538352117000.0,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10)...",
4,max,ÃÂlafur Arnalds,Logged Out,Zyonna,M,1321.0,Wright,3024.66567,paid,"Winston-Salem, NC",PUT,Upgrade,1543247354000.0,2474.0,ÃÂau hafa sloppiÃÂ° undan ÃÂ¾unga myrkursins,404.0,1543799476000.0,Mozilla/5.0 (compatible; MSIE 9.0; Windows NT ...,99.0


In [28]:
type(df)

pyspark.sql.dataframe.DataFrame

In [29]:
df.describe().toPandas().head()

Unnamed: 0,summary,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,count,228108,286500,278154,278154,286500.0,278154,228108.0,286500,278154,286500,286500,278154.0,286500.0,228108,286500.0,286500.0,278154,286500.0
1,mean,551.0852017937219,,,,114.41421291448516,,249.1171819778458,,,,,1535358834084.4272,1041.526554973822,Infinity,210.05459685863875,1540956889810.4834,,59682.02278593872
2,stddev,1217.7693079161374,,,,129.76726201140994,,99.2351792105836,,,,,3291321616.327586,726.7762634630741,,31.50507848842214,1507543960.8226302,,109091.9499991047
3,min,!!!,Cancelled,Adelaida,F,0.0,Adams,0.78322,free,"Albany, OR",GET,About,1521380675000.0,1.0,ÃÂg ÃÂtti GrÃÂ¡a ÃÂsku,200.0,1538352117000.0,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10)...",
4,max,ÃÂlafur Arnalds,Logged Out,Zyonna,M,1321.0,Wright,3024.66567,paid,"Winston-Salem, NC",PUT,Upgrade,1543247354000.0,2474.0,ÃÂau hafa sloppiÃÂ° undan ÃÂ¾unga myrkursins,404.0,1543799476000.0,Mozilla/5.0 (compatible; MSIE 9.0; Windows NT ...,99.0


In [30]:
#lets look at schema of data :
df.printSchema();

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [31]:
#df.toPandas().isnull().sum()

In [32]:
# drop NAs there is no null values in columns in userId and sessionId
df = df.dropna(how = 'any', subset = ['userId', 'sessionId'])

In [33]:
# drop empty strings
df = df.filter(df['userId'] != '')

In [34]:
# df.write.csv('test_20191218.csv')

In [35]:
print((df.count(), len(df.columns)))

(278154, 18)


# Exploratory Data Analysis
When you're working with the full dataset, perform EDA by loading a small subset of the data and doing basic manipulations within Spark. In this workspace, you are already provided a small subset of data you can explore.

### Define Churn

Once you've done some preliminary analysis, create a column `Churn` to use as the label for your model. I suggest using the `Cancellation Confirmation` events to define your churn, which happen for both paid and free users. As a bonus task, you can also look into the `Downgrade` events.

### Explore Data
Once you've defined churn, perform some exploratory data analysis to observe the behavior for users who stayed vs users who churned. You can start by exploring aggregates on these two groups of users, observing how much of a specific action they experienced per a certain time unit or number of songs played.

In [36]:
df.select('page','UserId').groupby('page').agg({'page':'count'}).select('page','count(page)').show()

+--------------------+-----------+
|                page|count(page)|
+--------------------+-----------+
|              Cancel|         52|
|    Submit Downgrade|         63|
|         Thumbs Down|       2546|
|                Home|      10082|
|           Downgrade|       2055|
|         Roll Advert|       3933|
|              Logout|       3226|
|       Save Settings|        310|
|Cancellation Conf...|         52|
|               About|        495|
|            Settings|       1514|
|     Add to Playlist|       6526|
|          Add Friend|       4277|
|            NextSong|     228108|
|           Thumbs Up|      12551|
|                Help|       1454|
|             Upgrade|        499|
|               Error|        252|
|      Submit Upgrade|        159|
+--------------------+-----------+



In [37]:
#select unique users with cancellation confirmation in the page column 
churned_user_ids = df.filter(df.page == 'Cancellation Confirmation')\
                                            .select('userId')\
                                            .dropDuplicates()\
                                            .rdd.flatMap(lambda x : x)\
                                            .collect()

In [38]:
#define churn with 1 being users who have pressed “cancellation confirmation”, 0 otherwise
df = df.withColumn('churn', when(col("userId").isin(churned_user_ids), 1).otherwise(0))

In [None]:
#creating a temp view from dataframe df,so that we can use sparkSQL for quick easy analysis
df.createOrReplaceTempView('data');

In [None]:
#sparkSQL to understand churn rate
df_churn = spark.sql("""
              SELECT
                  churn,
                  count(distinct userId) as unique_user_count
                FROM
                    data
                GROUP BY
                    churn
                """)
df_churn.show()

n_users_churn = df_churn.filter(col('churn') == '1')\
                             .select('unique_user_count').first()[0]
n_users_remain = df_churn.filter(col('churn') == '0')\
                             .select('unique_user_count').first()[0]

print('The number of churn user: {} with a churn rate of: {}'\
                             .format(n_users_churn,round((n_users_churn/(n_users_churn+n_users_remain)),3)))

In [None]:
# Alternatively, it seems that 'Cancelled' in 'auth' column also represents churn. let's just try to understand the data a bit more...
df_auth = spark.sql("""
            SELECT 
                auth, 
                count(distinct userId) as unique_user_count
            FROM 
                data
            WHERE 
                userId IS NOT NULL AND userID != ''
            GROUP BY 
                auth
            ORDER BY 
                unique_user_count DESC
            """)

df_auth.show()

n_users_registered = df_auth.filter(col('auth') == 'Logged In')\
                             .select('unique_user_count').first()[0]
n_users_cancelled = df_auth.filter(col('auth') == 'Cancelled')\
                             .select('unique_user_count').first()[0]

print('The number of users registered: {} and the number of users left: {} with a churn rate of: {}'\
                             .format(n_users_registered,n_users_cancelled,round((n_users_cancelled/n_users_registered),3)))

In [None]:
# Let's find the average number of songs played by churned and unchurned user.
df_songs_count = df.filter(col('page')=='NextSong')\
                                .groupby(col('churn'))\
                                .count()

df_songs_unique_users = df.filter(col('page')=='NextSong')\
                                .groupby(col('churn'))\
                                .agg(countDistinct(col('userId')))

avg_num_songs_played = df_songs_unique_users\
                .join(df_songs_count, df_songs_count.churn == df_songs_unique_users.churn)\
                .drop(df_songs_count.churn)

avg_num_songs_played = avg_num_songs_played\
                .withColumn('AvgNumberofSongs', \
                avg_num_songs_played['count']/avg_num_songs_played['count(DISTINCT userId)'])

In [None]:
#plotting average number of songs for churn and non-churn users
df_plt = avg_num_songs_played.select(['churn', 'AvgNumberofSongs']).toPandas()
sns.barplot(x="churn", y="AvgNumberofSongs", data = df_plt)
plt.xlabel("churn", fontsize=14);
plt.ylabel("Average Number of Songs Played", fontsize=12);
plt.title("Average Number of songs played:\nchurned users vs unchurned users", fontsize=14);

In [None]:
sns.barplot(x="churn", y="AvgNumberofSongs", data = df_plt)
plt.xlabel("churn", fontsize=14);
plt.ylabel("Average Number of Songs Played", fontsize=12);
plt.title("Average Number of songs played:\nchurned users vs unchurned users", fontsize=14);

In [None]:
# Average number of times each page is vistied: churned vs unchurned users.
df_page_churn_count = df.groupby(['churn','page']).agg({'page':'count'})

avg_num_page_visit = df_page_churn_count\
                .join(df_songs_unique_users, df_songs_unique_users.churn == df_page_churn_count.churn)\
                .drop(df_songs_unique_users.churn)

avg_num_page_visit = avg_num_page_visit\
                .withColumn('AvgNumberofPageVisit', \
                            avg_num_page_visit['count(page)']/avg_num_page_visit['count(DISTINCT userId)'])

In [None]:
# I am leaving out "NextSong" page as it dominates every other page, Moreover NextSong has been shown seperately in the plot above
avg_num_page_visit = avg_num_page_visit.filter(col('page') != 'NextSong')
avg_num_page_visit = avg_num_page_visit.toPandas()

In [None]:
# Plotting average number of times each page is vistied: churned vs unchurned users.
fig = plt.figure(figsize=(15, 4))
sns.barplot(x="page",y="AvgNumberofPageVisit", hue="churn", data=avg_num_page_visit, hue_order=[1, 0]);
plt.xticks(rotation=90, fontsize=14);
plt.yticks(fontsize=14)
plt.xlabel("page", fontsize=14)
plt.ylabel("Average number of page visits", fontsize=14)
plt.title("Average number of page visits: churned users vs unchurned users", fontsize=14)
plt.xticks(rotation=30, ha='right', fontsize=14)
plt.legend(loc='best', fontsize=14);

In [None]:
# number of users churned while being free subsribers vs paid subscribers
df_pd = df.filter(col('churn')==1).filter(df['page']=="Cancellation Confirmation").groupby("level").count().toPandas()
sns.barplot(x="level", y="count", data=df_pd);
plt.xlabel("level", fontsize=14);
plt.ylabel("Number of users churned", fontsize=14);
plt.title("Number of users churned while being: free subsribers vs paid subscribers", fontsize=14);

In [None]:
df_pd

In [None]:
# Checking ratio of male and female in churn and no churn user
df_gen = df.dropDuplicates(["userId", "gender"]).groupby(["churn", "gender"]).count().sort("churn").toPandas()
sns.barplot(x='churn', y='count', hue='gender', data=df_gen)
plt.title("ratio of male female in churn and no churn user", fontsize=14);

# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

After analyzing all the column above i have decided to use below features in my model: <br/>
 1:- Gender
 2:- UserAgent
 3:- Status
 4;- Page
 5:- Level
 6:- ItemInSession

Once the columns were identified, we now have to make sure that they are all in the numeric datatype so that they could be put into the model that we choose. 
The Gender, UserAgent, level and page columns had to be converted into numeric values using a combination of String Indexing and One Hot encoding.


In [None]:
#build pipeline
Gender_indexer = StringIndexer(inputCol="gender", outputCol='Gender_Index')
User_indexer = StringIndexer(inputCol="userAgent", outputCol='User_Index')
Page_indexer = StringIndexer(inputCol="page", outputCol='Page_Index')
Level_Indexer = StringIndexer(inputCol="level", outputCol='Level_Index')

Gender_encoder = OneHotEncoder(inputCol='Gender_Index', outputCol='Gender_Vec')
User_encoder = OneHotEncoder(inputCol='User_Index', outputCol='User_Vec')
Page_encoder = OneHotEncoder(inputCol='Page_Index', outputCol='Page_Vec')
Level_encoder = OneHotEncoder(inputCol='Level_Index', outputCol='Level_Vec')

#create VectorAssembler to push data to ML models
assembler = VectorAssembler(inputCols=["Gender_Vec", "User_Vec", "Page_Vec","Level_Vec", "itemInSession","status"], outputCol="features")
indexer = StringIndexer(inputCol="churn", outputCol="label")

#Lets normalize data
scaler = Normalizer(inputCol="features", outputCol="ScaledFeatures");

# Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

In [None]:
#select classification functions
lr =  LogisticRegression(featuresCol="ScaledFeatures", labelCol = "label", maxIter=10, regParam=0.01, elasticNetParam=0)
rf = RandomForestClassifier(labelCol="label",featuresCol="ScaledFeatures", featureSubsetStrategy='sqrt')
gbt = GBTClassifier(featuresCol="ScaledFeatures", labelCol="label",featureSubsetStrategy='sqrt')

In [None]:
#Creating Pipelines
pipeline_lr = Pipeline(stages=[Gender_indexer, User_indexer, Page_indexer, Level_Indexer, Gender_encoder,\
                            User_encoder, Page_encoder, Level_encoder, assembler, indexer, scaler,lr])

pipeline_rf = Pipeline(stages=[Gender_indexer, User_indexer, Page_indexer, Level_Indexer, Gender_encoder,
                            User_encoder, Page_encoder, Level_encoder, assembler, indexer, scaler, rf])

pipeline_gbt = Pipeline(stages=[Gender_indexer, User_indexer, Page_indexer, Level_Indexer, Gender_encoder,
                            User_encoder, Page_encoder, Level_encoder, assembler, indexer, scaler, gbt])

In [None]:
#Train Test Split: As a first step break your data set into 90% 
#of training data and set aside 10%. Set random seed to 42.
train, test = df.randomSplit([0.9, 0.1], seed=42)

In [None]:
#Define a base model before parameter turning

model_base_lr = pipeline_lr.fit(train)
model_base_rf = pipeline_rf.fit(train)
model_base_gbt = pipeline_gbt.fit(train)

In [None]:
#Function will calcualte f1 scores:
def model_performance(model, test_data, metric = 'f1'):
    """ Calculate Model Scores using f1 metric 
        Input: 
            model- trained model or pipeline object
            metric- the metric used to measure performance
            data - data on which performance measurement should be done
        Output:
            score
    """
    evaluator = MulticlassClassificationEvaluator(metricName = metric)
    prediction_result = model.transform(test_data)
    # find f1 score
    score = evaluator.evaluate(prediction_result)
    #return score
    return score

In [None]:
F1_base_lr = model_performance (model_base_lr, test)
F1_base_rf = model_performance (model_base_rf, test)
F1_base_gbt = model_performance (model_base_gbt, test)

print("Logistic Regression Base Model F1:{}".format(round(F1_base_lr,4)));
print("Random Forest Base Model F1:{}".format(round(F1_base_rf,4)));
print("GBT Classifier Base Model F1:{}".format(round(F1_base_gbt,4)));

In [None]:
#Logistics Regression Parameter Turning
import time
start_time= time.time()
paramGrid_lr = ParamGridBuilder() \
    .addGrid(lr.regParam,[0.0, 0.1,]) \
    .build()

cv_lr = CrossValidator(estimator=pipeline_lr,
                          estimatorParamMaps=paramGrid_lr,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=3)

cvModel_lr = cv_lr.fit(train)

end_time= time.time()

print("Total execution time for logistic:{}".format(round(end_time-start_time),3))

In [None]:
# Random forests model Parameter Turning
import time
start_time= time.time()
paramGrid_rf = ParamGridBuilder() \
    .addGrid(rf.impurity,['entropy', 'gini']) \
    .addGrid(rf.maxDepth,[2, 4, 6]) \
    .addGrid(rf.numTrees,[10, 25, 50]) \
    .build()

cv_rf = CrossValidator(estimator=pipeline_rf,
                             estimatorParamMaps=paramGrid_rf,
                             evaluator=MulticlassClassificationEvaluator(),
                             numFolds=3)

cvModel_rf = cv_rf.fit(train)
cvModel_rf.transform(test)
avgMetrics_rf = cvModel_rf.avgMetrics


end_time= time.time()
print("Total execution time for random forest model is:{}".format(round(end_time-start_time),3))

In [None]:
# GBT model Parameter Turning
import time
start_time= time.time()

paramGrid_gbt = ParamGridBuilder() \
    .addGrid(gbt.maxDepth,[4,6]) \
    .addGrid(gbt.stepSize,[0.01, 0.1]) \
    .build()

cv_gbt = CrossValidator(estimator=pipeline_gbt,
                             estimatorParamMaps=paramGrid_gbt,
                             evaluator=MulticlassClassificationEvaluator(),
                             numFolds=3)

cvModel_gbt = cv_gbt.fit(train)
cvModel_gbt.transform(test)
avgMetrics_gbt = cvModel_gbt.avgMetrics
print (avgMetrics_gbt)
end_time= time.time()
print("Total execution time for random forest model is:{}".format(round(end_time-start_time),3))

In [None]:
#Return model performance for the improved models
F1_lr = model_performance(cvModel_lr, test)
F1_rf = model_performance(cvModel_rf, test)
F1_gbt = model_performance(cvModel_gbt, test)

In [None]:
#print Model Testing F1-Score
print("Logistic Regression Improved Model F1:{}".format(round(F1_lr,4)));
print("Random Forest Classifier Improved Model F1:{}".format(round(F1_rf,4)));
print("GBT Classifier Improved Model F1:{}".format(round(F1_gbt,4)));

In [None]:
#Function will calcualte model accuracy:
def model_performance(model, test_data, metric = 'accuracy'):
    """ Calculate Model Scores using Accuracy Score 
        Input: 
            model- trained model or pipeline object
            metric- the metric used to measure performance
            data - data on which performance measurement should be done
        Output:
            score
    """
    evaluator = MulticlassClassificationEvaluator(metricName = metric)
    prediction_result = model.transform(test_data)
    # find f1 score
    score = evaluator.evaluate(prediction_result)
    #return score
    return score

Accuracy_lr = model_performance(cvModel_lr, test)
Accuracy_rf = model_performance(cvModel_rf, test)
Accuracy_gbt = model_performance(cvModel_gbt, test)

In [None]:
#print Model Testing Accuracy Score
print("Logistic Regression Model Accuracy:{}".format(round(Accuracy_lr,4)));
print("Random Forest Classifier Accuracy:{}".format(round(Accuracy_rf,4)));
print("GBT Classifier Accuracy:{}".format(round(Accuracy_gbt,4)));

With a F1 score of 0.874 and accuracy of 0.8948, GBTClassifier is a preferred model to perform best with the sample data set. 

To have the best parameters used in a different dataset, let us return the result of Grid Search

In [63]:
#Parameter Selection
bestPipeline = cvModel_gbt.bestModel
bestGBTModel = bestPipeline.stages[-1]
#bestParams = bestGBTModel.extractParamMap()
#bestParams
print('Best GBT Model: maxDepth={}'.format(bestGBTModel._java_obj.getMaxDepth()))
print('Best GBT Model: param stepSize={}'.format(bestGBTModel._java_obj.getStepSize()))

Best GBT Model: maxDepth=6
Best GBT Model: param stepSize=0.1


# Final Steps
Clean up your code, adding comments and renaming variables to make the code easier to read and maintain. Refer to the Spark Project Overview page and Data Scientist Capstone Project Rubric to make sure you are including all components of the capstone project and meet all expectations. Remember, this includes thorough documentation in a README file in a Github repository, as well as a web app or blog post.

### Test Result on a medium size data

In [16]:
df_new = spark.read.json("medium-sparkify-event-data.json")
df_new.head()

Row(artist='Martin Orford', auth='Logged In', firstName='Joseph', gender='M', itemInSession=20, lastName='Morales', length=597.55057, level='free', location='Corpus Christi, TX', method='PUT', page='NextSong', registration=1532063507000, sessionId=292, song='Grand Designs', status=200, ts=1538352011000, userAgent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='293')

In [17]:
print((df_new.count(), len(df_new.columns)))

(543705, 18)


#### Data Processing on the medium size data

In [21]:
#select unique users with cancellation confirmation in the page column 
churned_user_ids = df_new.filter(df_new.page == 'Cancellation Confirmation')\
                                            .select('userId')\
                                            .dropDuplicates()\
                                            .rdd.flatMap(lambda x : x)\
                                            .collect()

In [None]:
#define churn with 1 being users who have pressed “cancellation confirmation”, 0 otherwise
df_new = df_new.withColumn('churn', when(col("userId").isin(churned_user_ids), 1).otherwise(0))

In [None]:
test = df.randomSplit([0.9, 0.1], seed=42)