In [1]:
sqlDF = spark.sql("SELECT msno, MAX(transaction_date) as transaction_date, first(payment_method_id) as payment_method_id, AVG(payment_plan_days) as payment_plan_days,  first(plan_list_price) as plan_list_price, first(actual_amount_paid) as actual_amount_paid, first(is_auto_renew) as is_auto_renew, MAX(membership_expire_date) as membership_expire_date, first(is_cancel) as is_cancel FROM transactions_csv GROUP BY msno ")
display(sqlDF)



In [2]:
sqlDF2 = spark.sql("SELECT msno, AVG(num_25) AS num_25_avg, AVG(num_50) AS num_50_avg, AVG(num_75) AS num_75_avg, AVG(num_985) AS num_985_avg, SUM(num_unq) AS num_unq_avg, AVG(total_secs) AS total_secs_avg FROM user_logs GROUP BY msno ")
display(sqlDF2)

In [3]:
sqlDF3 = spark.sql("SELECT * FROM train_csv")
final = sqlDF.join(sqlDF2, 'msno', "outer")
final2 = final.join(sqlDF3, 'msno', "outer")
final3 = final2.na.drop(subset=["total_secs_avg"])
final4 = final3.na.drop(subset=["is_churn"])
print(final4)

In [4]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["num_25_avg", "num_50_avg", "num_75_avg", "num_unq_avg", "transaction_date", "payment_method_id", "payment_plan_days", "plan_list_price",  "actual_amount_paid", "is_auto_renew", "membership_expire_date", "is_cancel",  "total_secs_avg"], outputCol="features")
final_df = assembler.transform(final4)
final_df.select("features").show(4, False)

In [5]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


labelIndexer = StringIndexer(inputCol="is_churn", outputCol="indexedLabel").fit(final_df)

featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=10).fit(final_df)
  
  
  # Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = final_df.randomSplit([0.7, 0.3])

# Train a GBT model.
gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=10)

# Chain indexers and GBT in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(100)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

gbtModel = model.stages[2]
print(gbtModel)  # summary only

In [6]:
print("Test Error = %g" % (1.0 - accuracy))

In [7]:
predictions.select("prediction", "indexedLabel", "features").show(100)