## Disclaimer :
this notebook is not published with good performance, the notebook aims to  propose an approach based on Spark to introduce people to Distributed Computing and to encourage this type of implementation, that is representative of the industry reality.

In this notebook, we'll try to introduce you  pyspark which is an interface for Apache Spark ( dedicated for distributed computation ) in Python.

In [1]:
import numpy as np
import pandas as pd 
import random
import os



The power of Pyspark is its simplicity, because we don't need to manipulate / use Resilient Distributed Dataset ( RDD ) which are the basic unit of Spark that are distributed on the cluster, pyspark handle all of this. However, if you want to program at this level you can easily do it.

## Resilient Distributed Dataset
-  An RDD (Resilient Distributed Dataset) is the basic abstraction of Spark representing an unchanging set of elements partitioned across cluster nodes, allowing parallel computation.
- It has 3 advantages:
    - Performance.
    - Consistency.
    - Fault Tolerance

Pyspark supports most of Spark features such as Dataframe / SparkSQL / MLlib. We will not introduce Spark Streaming in this tutorial.

## SparkSQL 
- Simply said, sparkSQL allows  to query structured data inside Spark programs. It acts as a distributed SQL query engine for fast data retrieving and processing. We can  also use SQL's like code creating a temporary view as we will see at the end of the tutorial
- It provides a programming abstraction called DataFrame:
    -  Just as Python Dataframe, pyspark allows us to use this structure which is distributable across multiple machine of the cluster

## MLlib
- MLlib is a scalable machine learning library that provides a uniform set of high-level APIs that help users create and tune practical machine learning pipelines. For this competition, we'll use trees algorithms.



In [2]:
# Install pyspark
!pip3 install pyspark

Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.3/281.3 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=3b02e5cc91158c80712e3a9cc180d6bc3f14933f1fca1da615042e9067b65536
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.3.0
[0m

In [3]:
import pyspark
import pyspark.sql.functions as F
import pyspark.sql.types
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

SparkSession is the entry point to Spark/Pyspark to work with RDD/Dataframe. We can also configure properties like :

 - `spark.driver.memory` : Amount of memory to use for the driver process where the Session is initialized. I initialize it by default at 16gb         
 - `spark.sql.shuffle.partitions` :  number of partitions to use when shuffling data for joins or aggregations. In this example we partitioned the data into 150 partitions.

In [4]:
spark = (SparkSession.builder.master("local[*]")
                    .config("spark.driver.memory","16g")
                    .config("spark.sql.shuffle.partitions",20)
                    .appName('PysparkIsAllYouNeed')
                    .getOrCreate())


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/08/07 06:52:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
def sample_df(n_cols,fraction_rows,path):
    print("using PySpark ...")
    parDF1=spark.read.parquet(path)
    cols_list = ["customer_ID"] + random.sample(parDF1.columns[1:-1],n_cols) + ['target']
    
    parDF1 = parDF1.select(*cols_list).sample(fraction=fraction_rows)
    
    
    return parDF1

I observe that using all the columns make the kernel crash, so I decide to randomly select a subset of columns while looking for future solution.

In [6]:
train_sample = sample_df(n_cols=40,fraction_rows=0.2,path="../input/amex-parquet/train_data.parquet")
train_sample.count(),len(train_sample.columns)

using PySpark ...


                                                                                

(1105416, 42)

In [7]:
cols_used = train_sample.columns

### Preprocessing

In [8]:
from pyspark.ml.classification import DecisionTreeClassifier,RandomForestClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from sklearn.metrics import confusion_matrix

For the sake of the tutorial, I decide to apply a very simple preprocessing and aggregation. You can easily perform better with further preprocessing

### Aggregation function

In [9]:
def agg_data(train_sample,option="train"):
    # Select only numerical columns for aggregation
    if option =="train":
        numerical_list = [item[0] for item in train_sample.dtypes[1:-1] if not  item[1].startswith('string') ]


    else:
        numerical_list = [item[0] for item in train_sample.dtypes[1:] if not  item[1].startswith('string') ]
    print(numerical_list)
    sum_agg = {x: "sum" for x in numerical_list   if x is not train_sample.columns[0] }
    min_agg = {x: "min" for x in numerical_list if x is not train_sample.columns[0] }
    std_agg = {x: "stddev_samp" for x in numerical_list if x is not train_sample.columns[0] }
    #variance_agg = {x: "variance" for x in numerical_list if x is not train_sample.columns[0] }
    avg_agg = {x: "mean" for x in numerical_list if x is not train_sample.columns[0] }
    if option =="train":
        max_agg = {x: "max" for x in numerical_list + ['target'] if x is not train_sample.columns[0] }

    else:
        max_agg = {x: "max" for x in numerical_list if x is not train_sample.columns[0] }

    sum_df = train_sample.groupBy("customer_ID").agg(sum_agg).withColumnRenamed("customer_ID","customer_ID_1")
    min_df = train_sample.groupBy("customer_ID").agg(min_agg).withColumnRenamed("customer_ID","customer_ID_2")
    std_df = train_sample.groupBy("customer_ID").agg(std_agg).withColumnRenamed("customer_ID","customer_ID_3")
    #variance_df = train_sample.groupBy("customer_ID").agg(variance_agg).withColumnRenamed("customer_ID","customer_ID_4")
    avg_df = train_sample.groupBy("customer_ID").agg(avg_agg).withColumnRenamed("customer_ID","customer_ID_5")
    max_df = train_sample.groupBy("customer_ID").agg(max_agg).withColumnRenamed("customer_ID","customer_ID_6")

    join1 = sum_df.join(min_df, sum_df.customer_ID_1 == min_df.customer_ID_2, 'inner').select("*")
    join2 = join1.join(std_df, join1.customer_ID_1 == std_df.customer_ID_3, 'inner').select("*")
    #join3 = join2.join(variance_df, join2.customer_ID_1 == variance_df.customer_ID_4, 'inner').select("*")

    join4 = join2.join(avg_df, join2.customer_ID_1 == avg_df.customer_ID_5, 'inner').select("*")

    join_final = join4.join(max_df, join4.customer_ID_1 == max_df.customer_ID_6, 'inner').select("*")

    join_final = join_final.drop(*["customer_ID_{}".format(x) for x in range(2,7)]).withColumnRenamed("customer_ID_1","customer_ID")

    return join_final

### Preprocessing 

It is important to note that MLlib models inputs should be in this form ( X,Y ) where X represent the vectorized inputs. For this, we simply need to use `VectorAssembler`.

In [10]:
def preprocess_data(df,option="train"):

    if option =="train":
        agg_df = agg_data(df,option="train")
        print("training ...")
        agg_df = agg_df.withColumnRenamed("max(target)", "target")
        input_cols = agg_df.select("*").drop("customer_ID","target").columns
        va = VectorAssembler(inputCols = input_cols, outputCol='features',handleInvalid = "keep")
        va_df = va.transform(agg_df)
        va_df = va_df.select(['features', 'target'])  
    else:
        print("testing ...")
        agg_df = agg_data(df,option="test")
        input_cols = agg_df.select("*").drop("customer_ID").columns
        va = VectorAssembler(inputCols = input_cols, outputCol='features',handleInvalid = "keep")
        va_df = va.transform(agg_df)
        va_df = va_df.select(['customer_ID','features']) 
    
          
    return va_df

In [11]:
train_data_agg_vectorized  = preprocess_data(train_sample,"train")
train_data_agg_vectorized.count(),len(train_data_agg_vectorized.columns)

['D_82', 'D_121', 'B_6', 'R_28', 'S_18', 'D_88', 'R_27', 'B_2', 'D_132', 'S_16', 'D_138', 'D_113', 'S_13', 'R_18', 'R_15', 'D_70', 'B_12', 'B_39', 'B_26', 'D_73', 'R_17', 'B_37', 'D_39', 'B_13', 'D_139', 'D_109', 'D_81', 'D_123', 'B_27', 'D_83', 'S_22', 'D_72', 'S_9', 'D_96', 'R_13', 'B_41', 'D_143', 'S_24']
training ...


                                                                                

(417202, 2)

In [12]:
# Split the data randomly
(train, valid) = train_data_agg_vectorized.randomSplit([0.7, 0.3])


### Model & evaluation

In [13]:
## Use Random Forest 
'''
dtc = RandomForestClassifier(numTrees=20,
      maxDepth=8,subsamplingRate=0.8,
      featureSubsetStrategy= "sqrt",
      featuresCol="features", labelCol="target")
'''
dtc = DecisionTreeClassifier(maxDepth=10,featuresCol="features", labelCol="target")
model = dtc.fit(train)




22/08/07 06:53:47 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.




22/08/07 06:54:16 WARN DAGScheduler: Broadcasting large task binary with size 1033.5 KiB


                                                                                

22/08/07 06:56:37 WARN DAGScheduler: Broadcasting large task binary with size 1005.1 KiB


                                                                                

22/08/07 06:56:39 WARN DAGScheduler: Broadcasting large task binary with size 1021.0 KiB


                                                                                

22/08/07 06:56:40 WARN DAGScheduler: Broadcasting large task binary with size 1049.7 KiB


                                                                                

22/08/07 06:56:42 WARN DAGScheduler: Broadcasting large task binary with size 1100.3 KiB


                                                                                

# Evaluation

Because of the unbalanced character of the dataset, we use ``areaUnderROC`` using the `BinaryClassificationEvaluator`.

Its also important to note that `.collect()` will gather/collect all the data that are distributed accross the cluster to this kernel. It's usually not recommended to use this method because local RAM can't handle this amount of data generally.

In [14]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="target",metricName="areaUnderROC")
pred = model.transform(valid)
acc = evaluator.evaluate(pred)
 
print("Prediction auc: ", acc)



22/08/07 06:57:06 WARN DAGScheduler: Broadcasting large task binary with size 1042.0 KiB




Prediction auc:  0.7613264801472656


                                                                                

In [15]:
'''
y_pred=pred.select("prediction").collect()
y_orig=pred.select("target").collect()

cm = confusion_matrix(y_orig, y_pred)
print("Confusion Matrix:")
print(cm)
'''

'\ny_pred=pred.select("prediction").collect()\ny_orig=pred.select("target").collect()\n\ncm = confusion_matrix(y_orig, y_pred)\nprint("Confusion Matrix:")\nprint(cm)\n'

In [16]:
# Free memory
del  train,valid, evaluator, acc, train_sample
try :
    del train_data_agg_vectorized, cm, y_pred, y_orig, pred,
except:
    pass

## Testing

In [17]:
test_path = "../input/amex-parquet/test_data.parquet"
test_cols = cols_used[:-1] # Use the same columns but without the target
test_data =spark.read.parquet(test_path).select(*test_cols)


In [18]:
test_data_agg_vectorized  = preprocess_data(test_data,"test")

test_data_agg_vectorized.count(),len(test_data_agg_vectorized.columns)

testing ...
['D_82', 'D_121', 'B_6', 'R_28', 'S_18', 'D_88', 'R_27', 'B_2', 'D_132', 'S_16', 'D_138', 'D_113', 'S_13', 'R_18', 'R_15', 'D_70', 'B_12', 'B_39', 'B_26', 'D_73', 'R_17', 'B_37', 'D_39', 'B_13', 'D_139', 'D_109', 'D_81', 'D_123', 'B_27', 'D_83', 'S_22', 'D_72', 'S_9', 'D_96', 'R_13', 'B_41', 'D_143', 'S_24']


                                                                                

(924621, 2)

In [19]:
test_pred = model.transform(test_data_agg_vectorized)

In [20]:
test_pred.show()



22/08/07 07:00:18 WARN DAGScheduler: Broadcasting large task binary with size 1017.6 KiB


[Stage 180:>                                                        (0 + 1) / 1]

+--------------------+--------------------+-----------------+--------------------+----------+
|         customer_ID|            features|    rawPrediction|         probability|prediction|
+--------------------+--------------------+-----------------+--------------------+----------+
|0000210045da4f81e...|[0.06097930658143...|   [3002.0,635.0]|[0.82540555402804...|       0.0|
|00003b41e58ede33b...|[0.05114417144795...|  [3122.0,1937.0]|[0.61711800751136...|       0.0|
|00004ffe6e01e1b68...|[0.07659108197549...|[111749.0,2306.0]|[0.97978168427513...|       0.0|
|00031400ff14b7d1f...|[0.06699633877724...|[111749.0,2306.0]|[0.97978168427513...|       0.0|
|0003e5c8c3d0be014...|[0.07874763966538...|[111749.0,2306.0]|[0.97978168427513...|       0.0|
|00044da6bd1122c29...|[0.08059975429205...|   [7734.0,794.0]|[0.90689493433395...|       0.0|
|00048ea2f1b75bb67...|[0.06202506215777...|    [199.0,536.0]|[0.27074829931972...|       1.0|
|000ada1377b0e5267...|[0.08360742888180...|     [10.0,214.0]

                                                                                

We'll use the famous  User-Defined Functions (UDFs) to  write functions in Python and use them when writing Spark SQL queries. In order to get the probability vector as described in the function, I just did some logical operations to get the desired probability vector.

In [21]:
from pyspark.sql.types import DoubleType, IntegerType
from pyspark.sql.functions import udf, col

def extract_prob_0(v):
    try:
        return float(v[0]) # Your VectorUDT is of length 2
    except ValueError:
        return None

def extract_prob_1(v):
    try:
        return float(v[1])  # Your VectorUDT is of length 2
    except ValueError:
        return None


extract_prob0_udf = udf(extract_prob_0, DoubleType())
extract_prob1_udf = udf(extract_prob_1, DoubleType())



probabilities = (test_pred.select("customer_ID","probability").withColumn("prob_0", extract_prob0_udf(col("probability")))
                                                             .withColumn("prob_1", extract_prob1_udf(col("probability"))))

In [22]:
probabilities.select("prob_0","prob_1").show(5)



22/08/07 07:02:21 WARN DAGScheduler: Broadcasting large task binary with size 1017.0 KiB


[Stage 191:>                                                        (0 + 1) / 1]

+------------------+-------------------+
|            prob_0|             prob_1|
+------------------+-------------------+
| 0.825405554028045|0.17459444597195492|
|0.6171180075113659|0.38288199248863414|
|0.9797816842751305|0.02021831572486958|
|0.9797816842751305|0.02021831572486958|
|0.9797816842751305|0.02021831572486958|
+------------------+-------------------+
only showing top 5 rows



                                                                                

### Some SparkSQL

In [23]:
sqlContext = SQLContext(spark)



In [24]:
probabilities.createOrReplaceTempView("tmp_probabilities")

sample_submission = sqlContext.sql(
"""
SELECT 
customer_ID,
CASE WHEN prob_0 > prob_1 THEN 1-prob_0
          ELSE prob_1
END as prediction
FROM tmp_probabilities
"""
)

In [25]:
sample_submission.show(5)



22/08/07 07:04:41 WARN DAGScheduler: Broadcasting large task binary with size 1018.4 KiB


[Stage 202:>                                                        (0 + 1) / 1]

+--------------------+-------------------+
|         customer_ID|         prediction|
+--------------------+-------------------+
|0000210045da4f81e...|0.17459444597195495|
|00003b41e58ede33b...| 0.3828819924886341|
|00004ffe6e01e1b68...|0.02021831572486954|
|00031400ff14b7d1f...|0.02021831572486954|
|0003e5c8c3d0be014...|0.02021831572486954|
+--------------------+-------------------+
only showing top 5 rows



                                                                                

#### Some sanity checks

In [26]:
sample_submission.groupBy("customer_ID").agg(F.count("customer_ID")).count() == sample_submission.count()

                                                                                

True

In [27]:
sample_submission.select("prediction").agg(F.max("prediction"),F.min("prediction")).show()



22/08/07 07:07:17 WARN DAGScheduler: Broadcasting large task binary with size 1124.4 KiB




+---------------+---------------+
|max(prediction)|min(prediction)|
+---------------+---------------+
|            1.0|            0.0|
+---------------+---------------+



                                                                                

In [28]:
# Convert sparkDataframe to pandas Dataframe ( we collect all the RDD's in the edge node )
final_submission_df = sample_submission.toPandas()



22/08/07 07:10:13 WARN DAGScheduler: Broadcasting large task binary with size 1018.3 KiB


                                                                                

In [29]:
target_path = "/kaggle/working/sample_submission.csv"

final_submission = final_submission_df.to_csv(target_path, index=False)


In [30]:
final_submission_df

Unnamed: 0,customer_ID,prediction
0,0000210045da4f81e5f122c6bde5c2a617d03eef67f82c...,0.174594
1,00003b41e58ede33b8daf61ab56d9952f17c9ad1c3976c...,0.382882
2,00004ffe6e01e1b688170bbd108da8351bc4c316eacfef...,0.020218
3,00031400ff14b7d1f113668cc7397b1ebd9c46cce1d14f...,0.020218
4,0003e5c8c3d0be014d653748cfd93a26be994a5508e1c1...,0.020218
...,...,...
924616,fff64f5763dd3eb07081f360cba18d82aea1487cbf292c...,0.020218
924617,fffa8409c201b2a7c6ee5694965d3372d3bbe4c25f8bf3...,0.810811
924618,fffb015266e480ba5eb8d8297ca41f2fc75cd4d369306f...,0.955357
924619,fffdf0264d14645b3e369401f5a89242139d2a7aa317b5...,0.020218
