<a href="https://colab.research.google.com/github/jchoubey/AMEX-Default-Prediction/blob/main/Amex_Kaggle_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Let's Begin**

Mount Drive to connect to Data

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Option 1: DASK

In [None]:
!python -m pip install dask[dataframe] --upgrade

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting fsspec>=0.6.0
  Downloading fsspec-2022.5.0-py3-none-any.whl (140 kB)
[K     |████████████████████████████████| 140 kB 5.0 MB/s 
Collecting partd>=0.3.10
  Downloading partd-1.2.0-py3-none-any.whl (19 kB)
Collecting locket
  Downloading locket-1.0.0-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: locket, partd, fsspec
Successfully installed fsspec-2022.5.0 locket-1.0.0 partd-1.2.0


In [None]:
# Import libraries
import time
import dask.dataframe as dd

# Read Data into a dask dataframe
df = dd.read_csv('/content/drive/MyDrive/Data-Science/amex-default-prediction/train_data.csv')

# Check Runtime of Dask Dataframe

#start
start = time.time()

#execute
metric = df.P_2.mean()
print(f'{metric.compute()}')

#end
end = time.time()

print(f'Time Taken: {end-start}')

# Option 2: PySpark

In [None]:
# Mount drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Install PySpark
!pip install pyspark

# Start a spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

spark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 36 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 10.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=ce90aaf8ab52893d2ed247d2f885fc2a8919a5d718a8606f9fa6a6b2ce1670b5
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [None]:
# Import libraries
from pyspark.sql.functions import *
from pyspark.sql.window import Window

[Rename column in PySpark](https://www.educba.com/pyspark-rename-column/)

[Create new calculated column](https://www.datasciencemadesimple.com/extract-first-n-and-last-n-character-in-pyspark/)

In [None]:
# Load Data
df = spark.read.csv('/content/drive/MyDrive/Data-Science/amex-default-prediction/train_data.csv', header=True)

# Get features (categorical & numerical)
features = df.drop('customer_ID', 'S_2').columns
cat_cols = ['B_30', 'B_38', 'D_114', 'D_116', 'D_117', 'D_120', 'D_126', 'D_63', 'D_64', 'D_66', 'D_68']
num_cols = [col for col in features if col not in cat_cols]

# Pre-process Time period
df = df.withColumnRenamed('S_2', 'Month')
df = df.withColumn('Month', concat(df.Month.substr(1,8), lit('01')))

# Subset Data (Perform data operations for single column to speed up. Later, it will be scaled to all columns.)
df_sample = df.select('customer_ID', 'Month', 'P_2')
df_sample.show(1)

+--------------------+----------+------------------+
|         customer_ID|     Month|               P_2|
+--------------------+----------+------------------+
|0000099d6bd597052...|2017-03-01|0.9384687191272548|
+--------------------+----------+------------------+
only showing top 1 row



[Calculate Aggregates of column in PySpark](https://stackoverflow.com/questions/53389938/pyspark-how-to-calculate-min-max-value-of-each-field-using-pyspark#:~:text=There%20are%20different%20functions%20you%20can%20use%20to,%28%22col_1%22%29%29%2C%20min%20%28col%20%28%22col_2%22%29%29%2C%20max%20%28col%20%28%22col_2%22%29%29%29.show%20%28%29)

In [None]:
# Check #months available
df_sample.agg(min(col("Month")), max(col("Month"))).show()

# Check if #months is consistent for all customers
cust_agg = df_sample.groupBy('customer_ID').agg(min(col("Month")), max(col("Month")))
cust_agg.show(30)

+----------+----------+
|min(Month)|max(Month)|
+----------+----------+
|2017-03-01|2018-03-01|
+----------+----------+

+--------------------+----------+----------+
|         customer_ID|min(Month)|max(Month)|
+--------------------+----------+----------+
|0000099d6bd597052...|2017-03-01|2018-03-01|
|00000fd6641609c6e...|2017-03-01|2018-03-01|
|00001b22f846c82c5...|2017-03-01|2018-03-01|
|000041bdba6ecadd8...|2017-03-01|2018-03-01|
|00007889e4fcd2614...|2017-03-01|2018-03-01|
|000084e5023181993...|2017-03-01|2018-03-01|
|000098081fde4fd64...|2017-03-01|2018-03-01|
|0000d17a1447b25a0...|2017-03-01|2018-03-01|
|0000f99513770170a...|2017-03-01|2018-03-01|
|00013181a0c5fc8f1...|2017-03-01|2018-03-01|
|0001337ded4e1c253...|2018-01-01|2018-03-01|
|00013c6e1cec7c21b...|2017-03-01|2018-03-01|
|0001812036f155833...|2017-03-01|2018-03-01|
|00018dd4932409baf...|2017-03-01|2018-03-01|
|000198b3dc70edd65...|2017-03-01|2018-03-01|
|000201146e53cacdd...|2017-03-01|2018-03-01|
|0002d381bdd8048d7...|20

In [None]:
# 1. Check null values
# 2. Drop columns that have null records >50%
# 3. For remaining, replace null records with 0

[Link: Aggregating multiple columns on multiple criterias in PySpark](https://stackoverflow.com/questions/62620453/pyspark-groupby-and-aggregate-avg-and-first-on-multiple-columns)

In [None]:
# Perform Aggregation for 18 months of data - first, last, mean
funcs_list = [mean, first, last]
cols_list = ['P_2']
expr = [f(c).alias(str(f.__name__) + '_' + str(c)) for f in funcs_list for c in cols_list]
df_sample_grouped = df_sample.groupBy('customer_ID').agg(*expr)
df_sample_grouped.show(5)

[Lags using Window function in PySpark](https://www.educba.com/pyspark-lag/)

In [None]:
from pyspark.sql.window import Window

windowSpec = Window.partitionBy("customer_ID").orderBy("Month")
df_sample = df_sample.withColumn("lag", lag("P_2",1).over(windowSpec))

In [None]:
df_sample.show(2)

+--------------------+----------+------------------+------------------+
|         customer_ID|     Month|               P_2|               lag|
+--------------------+----------+------------------+------------------+
|00000fd6641609c6e...|2017-03-01|0.9291219156224948|              null|
|00000fd6641609c6e...|2017-04-01|0.9184305365007384|0.9291219156224948|
+--------------------+----------+------------------+------------------+
only showing top 2 rows



In [None]:
#### Next Steps -------------------------------------------------------------------

# Null values: Drop columns with >50% records null
# Null value treatment in columns with <50% records null

# Months: 2017-03 to 2018-03 (13 months)
# Not all customers have 13 months of data: what points to consider when evaluating aggregates?

# Numerical columns: Mean, First, Last, Lag 1-4 were calculated. 
# Look into: Velocity, growth, acceleration, jerk, moving average. etc. 

# Categorical column treatment: 
# Roughly 10 columns are categorical (0/1). How to aggregate them. Ex: Count, Min, Max etc.

# Execute skeleton code over entire training data 
# Google colab not good for long-run as it is interactive, any thoughts on which IDE to use?