In [54]:
from pyspark.sql import SparkSession
import pyspark.sql.types as tp
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.functions import when, sum, avg, col, count
from pyspark.sql.types import LongType, IntegerType
from pyspark.sql.functions import udf
import pyspark.sql.functions as f

spark = SparkSession.builder.appName("ChurnPredictions").config("spark.driver.memory", "128g").getOrCreate()
sc = spark.sparkContext

post_schema = tp.StructType([
    tp.StructField(name='PostId',                 dataType=tp.DecimalType(10),   nullable= True),
    tp.StructField(name='PostTypeId',             dataType=tp.DecimalType(10),    nullable= True),
    tp.StructField(name='AcceptedAnswerId',       dataType=tp.DecimalType(10),   nullable= True),
    tp.StructField(name='CreationDate',           dataType=tp.DateType(),    nullable= True),
    tp.StructField(name='Score',                  dataType=tp.DecimalType(10),    nullable= True),
    tp.StructField(name='OwnerUserId',            dataType=tp.DecimalType(10),    nullable= True),
    tp.StructField(name='AnswerCount',            dataType=tp.DecimalType(10),   nullable= True),
    tp.StructField(name='CommentCount',           dataType=tp.DecimalType(10),   nullable= True),
    tp.StructField(name='ParentId',               dataType=tp.DecimalType(10),   nullable= True),
    tp.StructField(name='CreationDateOfOwner',    dataType=tp.DateType(),   nullable= True),
    tp.StructField(name='BodyWordNum',            dataType=tp.DecimalType(10),   nullable= True),
])

users_schema = tp.StructType([
    tp.StructField(name= 'Id',      dataType= tp.DecimalType(10),   nullable= True),
    tp.StructField(name= 'CreationDate',  dataType= tp.DateType(),    nullable= True),
    tp.StructField(name= 'isChurn',  dataType= tp.BooleanType(),    nullable= True),
])
# "./data/posts_dist.csv"
# posts_dist_df = spark.read.csv("./data/posts_dist.csv", header=True)
posts_dist_df = spark.read.csv("./data/posts_dist.csv", schema=post_schema, header=True)
users_train_df = spark.read.csv("./data/users_train_dist.csv", header=True, schema=users_schema)
users_val_df = spark.read.csv("./data/users_val_dist.csv", header=True, schema=users_schema)

posts_dist_df = posts_dist_df.na.fill(0)
# Rename post creation date to avoid conflicts with users table
posts_dist_df = posts_dist_df.withColumnRenamed("CreationDate", "PostCreationDate")

posts_dist_df.printSchema()
users_train_df.printSchema()
users_val_df.printSchema()
posts_dist_df.show()
users_train_df.show()
users_val_df.show()

root
 |-- PostId: decimal(10,0) (nullable = true)
 |-- PostTypeId: decimal(10,0) (nullable = true)
 |-- AcceptedAnswerId: decimal(10,0) (nullable = true)
 |-- PostCreationDate: date (nullable = true)
 |-- Score: decimal(10,0) (nullable = true)
 |-- OwnerUserId: decimal(10,0) (nullable = true)
 |-- AnswerCount: decimal(10,0) (nullable = true)
 |-- CommentCount: decimal(10,0) (nullable = true)
 |-- ParentId: decimal(10,0) (nullable = true)
 |-- CreationDateOfOwner: date (nullable = true)
 |-- BodyWordNum: decimal(10,0) (nullable = true)

root
 |-- Id: decimal(10,0) (nullable = true)
 |-- CreationDate: date (nullable = true)
 |-- isChurn: boolean (nullable = true)

root
 |-- Id: decimal(10,0) (nullable = true)
 |-- CreationDate: date (nullable = true)
 |-- isChurn: boolean (nullable = true)

+------+----------+----------------+----------------+-----+-----------+-----------+------------+--------+-------------------+-----------+
|PostId|PostTypeId|AcceptedAnswerId|PostCreationDate|Score|Own

In [106]:
train.show()

+-------+----------+----------------+----------------+-----+-----------+------------+--------+-----------+---+------------+-------+
| PostId|PostTypeId|AcceptedAnswerId|PostCreationDate|Score|AnswerCount|CommentCount|ParentId|BodyWordNum| Id|CreationDate|isChurn|
+-------+----------+----------------+----------------+-----+-----------+------------+--------+-----------+---+------------+-------+
|  11199|         1|           11215|      2008-08-14|    5|         15|           0|       0|         68|299|  2008-08-04|  false|
|  11311|         1|           11323|      2008-08-14|   21|         12|           2|       0|         73|299|  2008-08-04|  false|
|  28377|         1|           28452|      2008-08-26|   98|          9|           0|       0|         18|299|  2008-08-04|  false|
|  28478|         1|           28498|      2008-08-26|   10|          2|           0|       0|         54|299|  2008-08-04|  false|
|  50565|         1|           50590|      2008-09-08|    1|          2|    

In [50]:
def DaysDiffCategorizer(DaysDiff):
    if DaysDiff <= 30:
        return "early"
    elif DaysDiff < 210:
        return "mid"
    else:
        return "late"

In [59]:
from pyspark.sql import functions as F
from pyspark.sql.functions import when, sum, avg, col, count, stddev, round

def ExtractFeatures(df):
    # 1. Calculate DaysDiff between PostCreationDate and AccountCreationDate
    df = df.withColumn("DaysDiff", F.datediff(df['PostCreationDate'], df['CreationDate']))
    df = df.withColumn("DaysDiff", df["DaysDiff"].cast(IntegerType()))

    df = df.filter(df.DaysDiff >= 0)
    df = df.withColumn("DaysDiff", df["DaysDiff"].cast(IntegerType()))
    df = df.withColumn("Id", df["Id"].cast(IntegerType()))

    
    # 2. Binning DaysDiff into Early(~30), Mid(~210), Late(210~)
    bucket_udf = udf(DaysDiffCategorizer, StringType())
    df = df.withColumn("Time", bucket_udf("DaysDiff"))
    df = df.drop("DaysDiff").drop("PostCreationDate").drop("CreationDate")
    # df.show()

    # 3. Extract feature: total# of question/answer posts
    qnsPostCnt_df = df.groupBy("Id","PostTypeId").agg(
        when(col("PostTypeId") == "1", count("*")).alias("Question"),
        when(col("PostTypeId") == "2", count("*")).alias("Answer")
        ).drop("PostTypeId")
    qnsPostCnt_df = qnsPostCnt_df.groupBy("Id").sum()
    qnsPostCnt_df = qnsPostCnt_df.withColumnRenamed("sum(Question)", "Questions"). \
            withColumnRenamed("sum(Answer)", "Answer").drop("sum(Id)")

    qnsPostCnt_df = qnsPostCnt_df.na.fill(0)
    # dp.filter(dp.Id == 299).show()

    # 4. Extract feature: total# of posts per-interval (early, mid, late)
    time_df = df.groupBy("Id", "Time").agg(
        when(col("Time") == "early", count("*")).alias("EarlyPosts"),
        when(col("Time") == "mid", count("*")).alias("MidPost"),
        when(col("Time") == "late", count("*")).alias("LatePost"))
    time_df = time_df.na.fill(0)
    time_df = time_df.groupBy("Id").sum().drop("sum(Id)")
    time_df = time_df.withColumnRenamed("sum(EarlyPosts)", "Early"). \
                      withColumnRenamed("sum(MidPost)", "Mid"). \
                      withColumnRenamed("sum(LatePost)", "Late")
  
    # 5. Extract feature: sum,avg,stddev of each score, answercount, comments, bodywordnum
    stats_df = df.groupBy("Id").agg(
        sum("Score").alias("sum_scr"), \
        avg("Score").alias("avg_scr"), \
        round(stddev("Score"), 4).alias("std_scr"), \
        sum("AnswerCount").alias("sum_ans"), \
        avg("AnswerCount").alias("avg_ans"), \
        round(stddev("AnswerCount"), 4).alias("std_ans"), \
        sum("CommentCount").alias("sum_cmt"), \
        avg("CommentCount").alias("avg_cmt"), \
        round(stddev("CommentCount"), 4).alias("std_cmt"), \
        sum("BodyWordNum").alias("sum_wrd"), \
        avg("BodyWordNum").alias("avg_wrd"), \
        round(stddev("BodyWordNum"), 4).alias("std_wrd")) \
    .na.fill(0)
    # .show(truncate=False)


    
    # 6. Extract feature: Count per-user AcceptedAnswerCount, meaning how many user's answer posts are accepted by other users
    questionsPosts = posts_dist_df.filter(posts_dist_df.PostTypeId == 1).select('AcceptedAnswerId')
    answersPosts = posts_dist_df.filter(posts_dist_df.PostTypeId == 2).select('PostId', 'ParentId', 'OwnerUserId').sort("PostId")
    qna = questionsPosts.join(answersPosts, questionsPosts["AcceptedAnswerId"] == answersPosts["PostId"], how="inner").sort("OwnerUserId")
    qna = qna.withColumnRenamed("ParentId", "QuestionPostId")
    qna = qna.withColumnRenamed("OwnerUserId", "Id")
    qna = qna.drop("PostId")
    qna = qna.groupBy("Id").agg(F.count("*").alias("AcceptedAnswerCnt"))
    # qna.show()

    churn_df = df.select("Id", "isChurn").distinct().sort("Id")

    # time_df
    # churn_df.show()
    # df.show()    
    # time_df.sort("Id").show()
    # qnsPostCnt_df.sort("Id").show()
    result = qnsPostCnt_df.join(time_df, "Id").join(stats_df, "Id").join(churn_df, "Id")
    # result = result
    
    # result.show()
    return result

train_df = ExtractFeatures(train)
train_df.show()

In [None]:
train.repartition(1).write.format('com.databricks.spark.csv').save("./train.csv",header = 'true')
val.repartition(1).write.format('com.databricks.spark.csv').save("./val.csv",header = 'true')

In [53]:

from pyspark.sql import functions as F
from pyspark.sql.functions import when, sum, avg, col, count
from pyspark.sql.types import LongType, IntegerType
from pyspark.sql.functions import udf
import pyspark.sql.functions as f

# Binning DaysDiff into Early, Mid, Late.
def categorizer(DaysDiff):
    if DaysDiff <= 30:
        return "early"
    elif DaysDiff < 210:
        return "mid"
    else:
        return "late"

def ExtractFeatures(df, dfu):
    # Cast some numeric columns to IntegerType()
    # df = df.withColumn("PostId", df['PostId'].cast(IntegerType()))
    # df = df.withColumn("AcceptedAnswerId", df['AcceptedAnswerId'].cast(IntegerType()))
    df = df.withColumn("CommentCount", df['CommentCount'].cast(IntegerType()))
    df = df.withColumn("BodyWordNum", df['BodyWordNum'].cast(IntegerType()))
    df = df.withColumn("Score", df['Score'].cast(IntegerType()))
    # df = df.withColumn("OwnerUserId", df['OwnerUserId'].cast(IntegerType()))

    # Calculate date difference between CreationDate and CreationDateOfOwner
    df = df.withColumn("DaysDiff", F.datediff(df['CreationDate'], df['CreationDateOfOwner']))
    df = df.withColumn("DaysDiff", df["DaysDiff"].cast(IntegerType()))

    df = df.filter(df.DaysDiff >= 0)
    df = df.withColumn("DaysDiff", df["DaysDiff"].cast(IntegerType()))

    df = df.select("Id","CreationDateOfOwner", "PostId", "CreationDate", "DaysDiff", "Score","AnswerCount", "CommentCount", "BodyWordNum", "AcceptedAnswerId")
    bucket_udf = udf(categorizer, StringType())
    df = df.withColumn("When", bucket_udf("DaysDiff"))

    dp = df.groupBy("Id", "When").agg(when(col("When") == "early", count("*")).alias("EarlyPosts"), when(col("When") == "mid", count("*")).alias("MidPost"), when(col("When") == "late", count("*")).alias("LatePost"))
    dp = dp.drop("When")
    dp = dp.na.fill(0)
    dp = dp.groupBy("Id").sum()
    # Construct per-user statistics
    users_df = df.groupBy("Id").agg(F.count("PostId").alias("TotalPost"), F.sum("Score").alias("TotalScore"), F.sum("AnswerCount").alias("TotalAnswers"), F.sum("CommentCount").alias("TotalCmt"), F.sum("BodyWordNum").alias("TotalWords"))

    users_df = users_df.join(dfu, users_df.Id == dfu.Id)
    users_df = users_df.drop("Id")
    users_df = users_df.drop("CreationDate")
    users_df = users_df.join(dp, ['Id'])
    users_df = users_df.drop("sum(Id)")
    # users_df = users_df.drop("OwnerUserId")
    users_df = users_df.withColumnRenamed("sum(EarlyPosts)", "#Early"). \
                                            withColumnRenamed("sum(MidPost)", "#Mid"). \
                                            withColumnRenamed("sum(LatePost)", "#Late")

# users_df = users_df.select('isChurn').rdd.map(lambda x: 0 if x == "False" else x)
# users_df = users_df['isChurn'].replace(True, 1, inplace=True)
    users_df = users_df.withColumn( "isChurn" , F.when( F.col("isChurn")=="True" , F.lit(1) ).otherwise(0) )
    return users_df 




In [None]:
#   df = posts_dist_df
#     dfu = users_train_df
train_df = ExtractFeatures(posts_dist_df, users_train_df)
train_df.show()

In [None]:
val_df = ExtractFeatures(posts_dist_df, users_val_dist_df)
val_df.show()

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.mllib.classification import SVMWithSGD, SVMModel
from pyspark.sql.functions import col
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.classification import LinearSVC

assembler = VectorAssembler(
       inputCols=["TotalPost", "TotalScore", "TotalAnswers", "TotalCmt", "TotalWords", "#Early", "#Mid", "#Late"],
       outputCol="features")

train_df_transformed = assembler.transform(train_df)
final_df = train_df_transformed.select(col("isChurn").alias("label"), col("features"))
# transformed_x.select('features').show()
# transformed_x.show()
# final.show()

# model = SVMWithSGD.train(final, iterations=100)
lsvc = LinearSVC(maxIter=10, regParam=0.1)
lsvcModel = lsvc.fit(final_df)


In [None]:
val_df_transformed = assembler.transform(val_df)
final_df = train_df_transformed.select(col("isChurn").alias("label"), col("features"))
lsvcModel.predict()

In [None]:
# Construct validation x,y
val_x = users_val_dist_df.join(posts_dist_df, posts_dist_df.OwnerUserId == users_val_dist_df.Id)
val_x.show()

In [None]:
ppdf = train_df.toPandas()
