In [0]:
%pyspark
df = spark.read.parquet("hdfs://thankgod/tmp/charity-base-with-status.parquet")
df = df.filter(df["CustomerAge"]>0).drop("id").drop("CustomerID")
simple = df.sample(.0236)
df.show(4)

In [1]:
%pyspark
df.describe().show()

In [2]:
%pyspark
from pyspark.sql.types import StringType,IntegerType
from pyspark.sql.functions import udf
y_udf = udf(lambda y: "No" if y=="0" else "Yes",StringType())
simple = df.withColumn("Charity",y_udf("CharityStatus")).drop("CharityStatus")
simple.show(4)

In [3]:
%pyspark

def age_multiple(age):
    if(age <= 25):
        return 'Under 25'
    elif (age >= 25 and age <= 35):
        return 'Between 25 and 35'
    elif (age > 35 and age < 50):
        return 'Between 36 and 49'
    elif(age >= 50):
        return 'Over 50'
    else:
        return 'N/A'
        
        
simple = simple.withColumn("CustomerAge", simple["CustomerAge"].cast(IntegerType()))
age_multiple_udf = udf(lambda age: age_multiple(age), StringType())
spark.udf.register("age_multiple_udf", age_multiple_udf)
simple = simple.withColumn("CustomerAge",age_multiple_udf("CustomerAge"))
simple.show(4)

In [4]:
%pyspark
simple.select("Charity").show(4)

In [5]:
%pyspark
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler

stringIndexer_State = StringIndexer().setInputCol("State").setOutputCol("State_index")
stringIndexer_CustomerZIPCode = StringIndexer().setInputCol("CustomerZIPCode").setOutputCol("CustomerZIPCode_index")
stringIndexer_CustomerAge = StringIndexer().setInputCol("CustomerAge").setOutputCol("CustomerAge_index")

stringIndexer_State.setHandleInvalid("keep")
stringIndexer_CustomerZIPCode.setHandleInvalid("keep")
stringIndexer_CustomerAge.setHandleInvalid("keep")

encoder_State = OneHotEncoder(inputCol="State_index", outputCol="State_encoded")
encoder_CustomerZIPCode = OneHotEncoder(inputCol="CustomerZIPCode_index", outputCol="CustomerZIPCode_encoded")
encoder_CustomerAge = OneHotEncoder(inputCol="CustomerAge_index", outputCol="CustomerAge_encoded")

label_indexer = StringIndexer().setInputCol("Charity").setOutputCol("label")
label_indexer.setHandleInvalid("keep")

assembler = VectorAssembler().setInputCols(["State_encoded","CustomerZIPCode_encoded","CustomerAge_encoded"]).setOutputCol("vectorized_features")
scaler = StandardScaler().setInputCol("vectorized_features").setOutputCol("features")

In [6]:
%pyspark
from pyspark.ml import Pipeline

pipeline_stages = Pipeline().setStages([stringIndexer_State,stringIndexer_CustomerZIPCode,stringIndexer_CustomerAge,encoder_State,encoder_CustomerZIPCode,encoder_CustomerAge,assembler,label_indexer,scaler])

pipeline_model = pipeline_stages.fit(simple)
pipeline_df = pipeline_model.transform(simple)
pipeline_df.show(5)

In [7]:
%pyspark
pipeline_df.drop("vectorized_features").show(5)

In [8]:
%pyspark
pipeline_df.write.parquet("hdfs://thankgod/pipleline.charitystatus.parquet")

In [9]:
%pyspark
train, test = pipeline_df.randomSplit([0.8,0.2],seed=2018)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

In [10]:
%pyspark
train.groupby("Charity").count().show()

In [11]:
%pyspark
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol='features',labelCol='label',maxIter=5)
lrModel = lr.fit(train)
predictions = lrModel.transform(test)

In [12]:
%pyspark
predictions.write.parquet("hdfs://thankgod/predictions.charitystatus.parquet")

In [13]:
%pyspark
predictions.select('label','features','rawPrediction','prediction','probability').show()

In [14]:
%pyspark
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(predictions.count())
print("Accuracy : ",accuracy)

In [16]:
%pyspark
print(lr.explainParams())

In [17]:
%pyspark
import matplotlib.pyplot as plt
import numpy as np
beta = np.sort(lrModel.coefficients)
plt.plot(beta)
plt.ylabel('Beta Coefficients')
plt.show()

In [18]:
%pyspark
print(lrModel.coefficientMatrix)