# Feature Engineering using Spark

In order to build a Lifetime Revenue (LTR) model, we need to aggregate the data at the customer-level and extract features from the 320 million transactions available. This will be done using Pyspark through SageMaker's PySparkProcessor class.

In [1]:
import io
import gc
from datetime import datetime
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
sns.set()

import sagemaker
import boto3

In [None]:
# session and role
sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
bucket = "capstone-transaction-data"
input_key = "preprocessed_data/test_transactions.parquet"
output_key = "processed_data/test_features.parquet"

## 1. PySpark Processing job

The pyspark script is located at spark/feature_eng.py. Here we are just running the processing job using a SageMaker Processor.

In [10]:
from sagemaker.spark.processing import PySparkProcessor

spark_processor = PySparkProcessor(
    base_job_name="spark-feature-engineering",
    framework_version="2.4",
    role=role,
    instance_count=1,
    instance_type="ml.m5.4xlarge",
    max_runtime_in_seconds=1200,
    volume_size_in_gb=60,
)

spark_processor.run(
    submit_app="spark/feature_eng.py",
    logs=False,
    arguments=['--s3_bucket', bucket,
               '--s3_input_key', input_key,
               '--s3_output_key', output_key]
)


Job Name:  spark-feature-engineering-2021-07-27-18-44-12-065
Inputs:  [{'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-2-165065549497/spark-feature-engineering-2021-07-27-18-44-12-065/input/code/feature_eng.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  []
........................................................................................................................................!

## 2. Aditional features

In this section we add a few ratios and other extra features using plain pandas.

Work in progress...

In [4]:
def read_parquet_s3(bucket, filepath):
    buffer = io.BytesIO()
    s3 = boto3.resource('s3')
    obj = s3.Object(bucket, filepath)
    obj.download_fileobj(buffer)
    return pd.read_parquet(buffer)

df = read_parquet_s3(bucket, "processed_data/train_features.parquet")
print(df.shape)
pd.set_option("max_columns", 99)
df.head()

(288081, 33)


Unnamed: 0,id,num_unique_date,max_transaction_amount,sum_amount,avg_transaction_amount,days_since_last_transaction,days_since_first_transaction,avg_daily_amount,unique_dates_to_days,unique_dates_180d,transactions_count_180d,purchase_amount_sum_180d,avg_transaction_amount_180d,purchasequantity_sum_180d,time_weighted_amount_180d,unique_dates_90d,transactions_count_90d,purchase_amount_sum_90d,avg_transaction_amount_90d,purchasequantity_sum_90d,time_weighted_amount_90d,unique_dates_60d,transactions_count_60d,purchase_amount_sum_60d,avg_transaction_amount_60d,purchasequantity_sum_60d,time_weighted_amount_60d,unique_dates_30d,transactions_count_30d,purchase_amount_sum_30d,avg_transaction_amount_30d,purchasequantity_sum_30d,time_weighted_amount_30d
0,12262064,120,65.959999,2295.72998,4.114212,1,213,10.778075,0.56338,109.0,524.0,2132.280029,4.069237,609.0,538.865601,54.0,267.0,1055.380005,3.952734,290.0,319.114532,37.0,185.0,749.809998,4.053027,201.0,249.026154,19.0,93.0,337.389984,3.627849,102.0,140.975647
1,12277270,93,24.98,3601.310059,5.789887,2,210,17.149096,0.442857,81.0,539.0,3129.820068,5.806716,623.0,840.18158,38.0,262.0,1507.849976,5.755153,310.0,509.219116,24.0,188.0,1091.97998,5.808404,226.0,413.054291,12.0,116.0,660.409973,5.69319,144.0,299.235992
2,12332190,49,164.899994,1416.559937,5.169927,0,210,6.745523,0.233333,45.0,259.0,1348.0,5.204633,328.0,347.427277,27.0,153.0,878.700012,5.743137,199.0,250.572433,15.0,107.0,704.51001,6.584206,141.0,210.490936,6.0,15.0,67.959999,4.530667,20.0,42.900917
3,12524696,70,21.99,1581.670044,3.886167,3,213,7.425681,0.328638,61.0,368.0,1440.27002,3.913777,463.0,347.924866,27.0,170.0,689.890015,4.058177,214.0,195.126587,16.0,103.0,396.540009,3.849903,137.0,127.830322,8.0,57.0,242.279999,4.250526,74.0,88.039108
4,12682470,73,36.990002,1578.01001,4.347135,3,213,7.408498,0.342723,63.0,278.0,1268.130005,4.561619,325.0,308.362732,30.0,112.0,546.119995,4.876071,134.0,161.810654,22.0,61.0,304.299988,4.988524,70.0,106.956429,13.0,41.0,196.849991,4.801219,48.0,78.548553


In the next notebook we will be doing a detailed analysis of each feature and the target variable.