# Sparkify 项目 Workspace
这个 Workspace 包括一个迷你的子数据集（128MB），是完整数据集（12GB）的一个子集。在将你的项目部署到云上之前，你可以自由使用 Workspace 来创建你的项目或用Spark来探索这个较小数据集。设置 Spark 集群的指南可以在选修 Spark 课程的内容里找到。

你可以依照下面的步骤进行项目的数据分析和模型搭建部分。

In [82]:
# import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum
import datetime
from pyspark.sql import Window

from pyspark.ml.feature import  VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import CountVectorizer, IDF, Normalizer, PCA, RegexTokenizer, StandardScaler, StopWordsRemover, StringIndexer, VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator



In [2]:
# create a Spark session
spark = SparkSession.builder \
    .master("local") \
    .appName("Sparkify Project") \
    .getOrCreate()

# 加载和清洗数据
在这个 Workspace 中，小数据集的名称是 `mini_sparkify_event_data.json`.加载和清洗数据集，检查是否有无效或缺失数据——例如，没有userid或sessionid的数据。 

In [3]:
stack_overflow_data = 'mini_sparkify_event_data.json'
df = spark.read.json(stack_overflow_data)

In [4]:
df_valid = df.dropna(how = "any", subset = ["userId", "sessionId"])
df_valid.count()

286500

In [5]:
#df.select("sessionId").dropDuplicates().sort("sessionId").show()
df_valid = df_valid.filter(df_valid["userId"] != "")
df_valid.count()

278154

In [6]:
df_valid.select('userId').dropDuplicates().count()

225

# 探索性数据分析
当你使用完整数据集时，通过加载小数据集，在 Spark 中完成基础操作来实现探索性数据分析。在这个 Workspace 中，我们已经提供给你一个你可以探索的小数据集。

### 定义客户流失

在你完成初步分析之后，创建一列 `Churn` 作为模型的标签。我建议你使用 `Cancellation Confirmation` 事件来定义客户流失，该事件在付费或免费客户身上都有发生。作为一个奖励任务，你也可以深入了解 `Downgrade` 事件。

### 探索数据
你定义好客户流失后，就可以执行一些探索性数据分析，观察留存用户和流失用户的行为。你可以首先把这两类用户的数据聚合到一起，观察固定时间内某个特定动作出现的次数或者播放音乐的数量。

In [7]:
flag_loss_event = udf(lambda x: 1 if x == "Cancellation Confirmation" else 0, IntegerType())
df_loss = df_valid.withColumn("Churn", flag_loss_event("page"))


In [8]:
df_loss.createOrReplaceTempView("df_loss_table")


In [8]:
spark.sql('''
          SELECT auth, mean(Churn) as Churn,count(*) as cnt
          FROM df_loss_table
          GROUP BY auth
          '''
          ).show()

+---------+-----+------+
|     auth|Churn|   cnt|
+---------+-----+------+
|Cancelled|  1.0|    52|
|Logged In|  0.0|278102|
+---------+-----+------+



In [15]:
spark.sql('''
          SELECT gender, mean(Churn) as Churn,count(*) as cnt,sum(Churn) as sum_churn
          FROM df_loss_table
          GROUP BY gender
          '''
          ).show()

+------+--------------------+------+---------+
|gender|               Churn|   cnt|sum_churn|
+------+--------------------+------+---------+
|     F|1.293845178485942...|154578|       20|
|     M|2.589499579206318...|123576|       32|
+------+--------------------+------+---------+



In [16]:
spark.sql('''
          SELECT level, mean(Churn) as Churn,count(*) as cnt,sum(Churn) as sum_churn
          FROM df_loss_table
          GROUP BY level
          '''
          ).show()

+-----+--------------------+------+---------+
|level|               Churn|   cnt|sum_churn|
+-----+--------------------+------+---------+
| free|3.768776583334829E-4| 55721|       21|
| paid|1.393678096325635...|222433|       31|
+-----+--------------------+------+---------+



In [17]:
spark.sql('''
          SELECT method, mean(Churn) as Churn,count(*) as cnt,sum(Churn) as sum_churn
          FROM df_loss_table
          GROUP BY method
          '''
          ).show()

+------+--------------------+------+---------+
|method|               Churn|   cnt|sum_churn|
+------+--------------------+------+---------+
|   PUT|                 0.0|257818|        0|
|   GET|0.002557041699449...| 20336|       52|
+------+--------------------+------+---------+



In [18]:
spark.sql('''
          SELECT status, mean(Churn) as Churn,count(*) as cnt,sum(Churn) as sum_churn
          FROM df_loss_table
          GROUP BY status
          '''
          ).show()

+------+--------------------+------+---------+
|status|               Churn|   cnt|sum_churn|
+------+--------------------+------+---------+
|   307|                 0.0| 23184|        0|
|   404|                 0.0|   252|        0|
|   200|2.041473315588219E-4|254718|       52|
+------+--------------------+------+---------+



# 特征工程
熟悉了数据之后，就可以构建你认为会对训练模型帮助最大的特征。要处理完整数据集，你可以按照下述步骤：
- 写一个脚本来从小数据集中提取你需要的特征
- 确保你的脚本可以拓展到大数据集上，使用之前教过的最佳实践原则
- 在完整数据集上运行你的脚本，按运行情况调试代码

如果是在教室的 workspace，你可以直接用里面提供的小数据集来提取特征。确保当你开始使用 Spark 集群的时候，把上述的成果迁移到大数据集上。

In [9]:
user_log = df_loss

In [10]:
user_log.head()

Row(artist='Martha Tilston', auth='Logged In', firstName='Colin', gender='M', itemInSession=50, lastName='Freeman', length=277.89016, level='paid', location='Bakersfield, CA', method='PUT', page='NextSong', registration=1538173362000, sessionId=29, song='Rockpools', status=200, ts=1538352117000, userAgent='Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0', userId='30', Churn=0)

In [11]:
from pyspark.sql import Window
windowval = Window.partitionBy("userId").orderBy(("ts")).rangeBetween(Window.unboundedPreceding, 0)
user_log_valid = user_log.withColumn("phase", Fsum("Churn").over(windowval))
user_log_valid = user_log_valid.filter((user_log_valid.phase==0) | (user_log_valid.page=='Cancellation Confirmation'))

In [12]:
df_user_var1 = user_log_valid.groupBy('userID').agg({'Churn':'max','song':'count'}).withColumnRenamed("max(Churn)", "Churn").withColumnRenamed('count(song)','song_cnt')

In [13]:
df_user_var2 = user_log_valid.filter(user_log_valid.level=='paid').groupBy('userID').agg({'level':'count'}).withColumnRenamed('count(level)','paid_cnt')

In [14]:
df_user_var3 = user_log_valid.filter(user_log_valid.level=='free').groupBy('userID').agg({'level':'count'}).withColumnRenamed('count(level)','free_cnt')

In [15]:
df_user = (df_user_var1.join(df_user_var2,['userID'],"left")).join(df_user_var3, ['userID'],"left")

In [16]:
df_user = df_user.fillna(0)

In [41]:
df_user = df_user.drop('userID')

In [42]:
df_user.take(1)

[Row(song_cnt=275, Churn=0, paid_cnt=0, free_cnt=381)]

# 建模
将完整数据集分成训练集、测试集和验证集。测试几种你学过的机器学习方法。评价不同机器学习方法的准确率，根据情况调节参数。根据准确率你挑选出表现最好的那个模型，然后报告在训练集上的结果。因为流失顾客数据集很小，我建议选用 F1 score 作为优化指标。

In [54]:
train, test = df_user.randomSplit([0.6, 0.4], seed=42)


In [44]:
train.take(5)

[Row(song_cnt=8, Churn=1, paid_cnt=0, free_cnt=10),
 Row(song_cnt=150, Churn=0, paid_cnt=0, free_cnt=201),
 Row(song_cnt=4079, Churn=0, paid_cnt=4825, free_cnt=0),
 Row(song_cnt=2841, Churn=1, paid_cnt=2859, free_cnt=578),
 Row(song_cnt=820, Churn=0, paid_cnt=858, free_cnt=144)]

In [55]:
assembler = VectorAssembler(inputCols=["song_cnt",'paid_cnt','free_cnt'], outputCol="features")
indexer = StringIndexer(inputCol="Churn", outputCol="label")

lr =  LogisticRegression(maxIter=10, regParam=0.0, elasticNetParam=0)

pipeline = Pipeline(stages=[ assembler, indexer, lr])

In [67]:
model = pipeline.fit(train)

In [68]:
results = model.transform(test)

In [70]:
print(results.filter(results.label == results.prediction).count())
print(results.count())

62
83


In [83]:
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam,[0.0, 0.1]) \
    .build()


crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(metricName='f1'),
                          numFolds=3)

In [84]:
cvModel_q1 = crossval.fit(train)

In [85]:
results_cv = cvModel_q1.transform(test)
print(results_cv.filter(results_cv.label == results_cv.prediction).count())
print(results_cv.count())

62
83


# 最后一步
清理你的代码，添加注释和重命名变量，使得代码更易读和易于维护。参考 Spark 项目概述页面和数据科学家毕业项目审阅要求，确保你的项目包含了毕业项目要求的所有内容，并且满足所有审阅要求。记得在 GitHub 代码库里包含一份全面的文档——README文件，以及一个网络应用程序或博客文章。