## Data manipulation with PySpark

This notebook implements codes using PySpark API for manipulating data under the Spark technology. Even though the dataset used here is not that large, this application also intends to explore data management for Big Data contexts. Each block of code has lots of useful information, specially because PySpark differs somewhat to usual Pandas and Numpy APIs. As being so, only a few functions were created, because procedural code is more transparent to understand its functionalities. For more on using PySpark, check its [documentation](http://spark.apache.org/docs/latest/api/python/), mainly its [API reference](http://spark.apache.org/docs/latest/api/python/reference/index.html).

The contents of this notebook are as follows: first, modules, functions and classes are imported; then, data is loaded and readily processed in the proper manner given the dataset's specificities. Different PySpark dataframe APIs are explored in a section for descriptive statistics and data visualization. Next, a section illustrates how to create a temporary view table so data can be handled using SQL language through direct queries against the data. The section for data pre-processing starts by assessing missing values across the variables of the PySpark dataframe and ends with some data transformations using SQL language, which turns these operations very fast and straightforward to implement.

--------------

## Libraries

In [0]:
import pandas as pd
import numpy as np

from pyspark.sql import functions as func
from pyspark.sql.types import TimestampType

## Importing data

In [0]:
df = spark.read.format('csv').\
           options(header='true', delimiter = ',', inferSchema='true').\
           load("/FileStore/shared_uploads/matheusf.rosso@gmail.com/fraud_data_sample.csv")

# Sorting the dataframe by date:
df = df.sort(func.col('epoch').asc())

# Converting epoch into datetime:
df = df.withColumn('epoch', (func.col('epoch')/func.lit(1000)))
df = df.withColumn('datetime', func.date_format(df.epoch.cast(dataType=TimestampType()), "yyyy-MM-dd HH:mm:ss"))

first_date = df.agg(func.min('datetime').alias('first_date'), func.max('datetime').alias('last_date')).collect()[0]['first_date']
last_date = df.agg(func.min('datetime').alias('first_date'), func.max('datetime').alias('last_date')).collect()[0]['last_date']

print(f'Type of df: {type(df)}.')
print(f'Shape of df: ({df.count()}, {len(df.columns)}).')
print(f'Number of unique instances: {df.select("order_id").distinct().count()}.')
print(f'Time interval: ({first_date}, {last_date}).')

# Support variables:
drop_vars = ['y', 'order_amount', 'store_id', 'order_id', 'status', 'epoch', 'datetime', 'weight']

# display(df.take(3))
# df.display()

Type of df: <class 'pyspark.sql.dataframe.DataFrame'>.
Shape of df: (8361, 1429).
Number of unique instances: 8361.
Time interval: (2021-05-17 15:01:00, 2021-06-27 23:55:01).


### Train-test split

In [0]:
# Number of instances by date:
orders_by_date = df.withColumn('date', func.to_date(func.col('datetime'))).\
                    select('date', 'order_id').groupBy('date').agg(func.count('order_id').alias('freq'))
orders_by_date = orders_by_date.toPandas().sort_values('date', ascending=True)

# Accumulated number of instances by date:
orders_by_date['acum'] = np.cumsum(orders_by_date.freq)
orders_by_date['acum_share'] = [a/orders_by_date['acum'].max() for a in orders_by_date['acum']]

# Date with 75% of volume of data:
last_date_train = orders_by_date.iloc[np.argmin(abs(orders_by_date['acum_share'] - 0.75))]['date']

# Train-test split:
df_test = df.filter(func.col('datetime') > last_date_train)
df_train = df.filter(func.col('datetime') <= last_date_train)

# Instances identification for training and test data:
train_orders, test_orders = ([r['order_id'] for r in df_train.select('order_id').collect()],
                             [r['order_id'] for r in df_test.select('order_id').collect()])

first_date_train = df_train.agg(func.min('datetime').alias('first_date'), func.max('datetime').alias('last_date')).collect()[0]['first_date']
last_date_train = df_train.agg(func.min('datetime').alias('first_date'), func.max('datetime').alias('last_date')).collect()[0]['last_date']
first_date_test = df_test.agg(func.min('datetime').alias('first_date'), func.max('datetime').alias('last_date')).collect()[0]['first_date']
last_date_test = df_test.agg(func.min('datetime').alias('first_date'), func.max('datetime').alias('last_date')).collect()[0]['last_date']

print(f'Shape of df_train: ({df_train.count()}, {len(df_train.columns)}).')
print(f'Number of unique instances (training data): {df_train.select("order_id").distinct().count()}.')
print(f'Time interval (training data): ({first_date_train}, {last_date_train}).\n')

print(f'Shape of df_test: ({df_test.count()}, {len(df_test.columns)}).')
print(f'Number of unique instances (test data): {df_test.select("order_id").distinct().count()}.')
print(f'Time interval (test data): ({first_date_test}, {last_date_test}).')

# df_train.display()

Shape of df_train: (6313, 1429).
Number of unique instances (training data): 6313.
Time interval (training data): (2021-05-17 15:01:00, 2021-06-09 23:56:06).

Shape of df_test: (2048, 1429).
Number of unique instances (test data): 2048.
Time interval (test data): (2021-06-10 00:01:08, 2021-06-27 23:55:01).


## Descriptive statistics and data visualization

### Distributions

In [0]:
# Share of orders by status:
display(df_train.select('status', 'order_id').groupBy('status').\
                 agg(func.count('order_id').alias('freq')).withColumn('share', (func.col('freq')/func.lit(df_train.count()))).\
                 sort(func.col('share').desc()))

status,freq,share
APPROVED,5886,0.932361793125297
DECLINED,264,0.0418184698241723
FRAUD,163,0.0258197370505306


In [0]:
# Share of risky orders by status:
only_pos = df_train.filter(func.col('y')==1)

display(only_pos.select('status', 'order_id').groupBy('status').\
                 agg(func.count('order_id').alias('freq')).withColumn('share', (func.col('freq')/func.lit(only_pos.count()))))

status,freq,share
FRAUD,163,0.3817330210772833
DECLINED,264,0.6182669789227166


### Relationship between label and order amount

In [0]:
# Distribution of order amount:
display(df_train.select('order_amount').describe())

summary,order_amount
count,6313.0
mean,131.40789798827336
stddev,229.2266772371007
min,12.48
max,6860.0


In [0]:
# Average order amount by status:
display(df_train.select('status', 'y', 'order_amount').groupBy('status').agg({'y': 'mean', 'order_amount': 'mean'}).sort('avg(order_amount)'))

status,avg(order_amount),avg(y)
APPROVED,117.5427862725072,0.0
FRAUD,257.24546012269934,1.0
DECLINED,362.8417045454547,1.0


In [0]:
# Distribution of order amount by label:
display(df_train.select('y', 'order_amount'))

y,order_amount
0.0,70.0
0.0,105.69
0.0,33.49
0.0,53.99
0.0,103.2
0.0,39.0
0.0,45.0
0.0,40.99
0.0,50.0
0.0,54.99


### Label and status throughout the time

In [0]:
# Rate of risky orders by date:
display(df_train.union(df_test).withColumn('date', func.to_date(func.col('datetime'))).select('date', 'y').groupBy('date').agg({'y': 'mean'}))

date,avg(y)
2021-06-04,0.0448275862068965
2021-05-29,0.0460251046025104
2021-06-01,0.0695364238410596
2021-05-21,0.0692041522491349
2021-05-31,0.0569395017793594
2021-05-18,0.0807453416149068
2021-05-28,0.0697674418604651
2021-05-27,0.0631578947368421
2021-05-24,0.0618556701030927
2021-06-02,0.0919540229885057


In [0]:
# Statistics of label and order amount by train-test split:
display(df_train.withColumn('train_test', func.lit('train')).union(df_test.withColumn('train_test', func.lit('test'))).\
                                                             select('train_test', 'y', 'order_amount').\
                                                             groupBy('train_test').\
                                                             agg(func.mean('y'), func.mean('order_amount')))

train_test,avg(y),avg(order_amount)
train,0.067638206874703,131.40789798827336
test,0.02099609375,143.07281249999974


In [0]:
# Share of orders by state and train-test split:
orders_status_train = df_train.select('status', 'order_id').groupBy('status').\
                               agg(func.count('order_id').alias('freq')).withColumn('share', (func.col('freq')/func.lit(df_train.count()))).\
                               sort(func.col('share').desc()).withColumn('train_test', func.lit('train'))
orders_status_test = df_test.select('status', 'order_id').groupBy('status').\
                             agg(func.count('order_id').alias('freq')).withColumn('share', (func.col('freq')/func.lit(df_test.count()))).\
                             sort(func.col('share').desc()).withColumn('train_test', func.lit('test'))

display(orders_status_train.union(orders_status_test))

status,freq,share,train_test
APPROVED,5886,0.932361793125297,train
DECLINED,264,0.0418184698241723,train
FRAUD,163,0.0258197370505306,train
APPROVED,2005,0.97900390625,test
DECLINED,39,0.01904296875,test
FRAUD,4,0.001953125,test


## Handling data using SQL

In [0]:
# Creating a temporary view table (training data):
df_train.createOrReplaceTempView("training_data")

# Creating a temporary view table (test data):
df_test.createOrReplaceTempView("test_data")

### Simple queries

In [0]:
%sql
SELECT store_id, order_id, epoch, order_amount, status, y FROM training_data LIMIT 10

store_id,order_id,epoch,order_amount,status,y
21519.0,PAG-2105CCCDACB7,1621263660.0,70.0,APPROVED,0.0
21519.0,PAG-2105CCDCB063,1621263960.0,105.69,APPROVED,0.0
21519.0,PAG-2105CCC5A070,1621264261.0,33.49,APPROVED,0.0
21519.0,PAG-2105CC597D43,1621264860.0,53.99,APPROVED,0.0
21519.0,PAG-2105CCFBA250,1621265341.0,103.2,APPROVED,0.0
21519.0,PAG-2105CC7C0AC0,1621265640.0,39.0,APPROVED,0.0
21519.0,PAG-2105CC32BB1F,1621266660.0,45.0,APPROVED,0.0
21519.0,PAG-2105CCA71763,1621266780.0,40.99,APPROVED,0.0
21519.0,PAG-2105CC35AF16,1621267440.0,50.0,APPROVED,0.0
21519.0,PAG-2105CC8096FD,1621267800.0,54.99,APPROVED,0.0


In [0]:
%sql
SELECT `SHIPPINGSTATE()`, AVG(y) AS fraud_rate, AVG(order_amount) from training_data GROUP BY `SHIPPINGSTATE()` ORDER BY fraud_rate DESC

SHIPPINGSTATE(),fraud_rate,avg(order_amount)
AC,1.0,262.1
RR,1.0,450.96
SE,0.4117647058823529,155.84705882352938
TO,0.4,322.604
MA,0.3125,121.70062500000002
RN,0.28,147.94559999999996
RO,0.2727272727272727,267.02181818181816
CE,0.2083333333333333,212.67
AL,0.1428571428571428,162.40047619047616
PE,0.1323529411764706,154.5892647058823


In [0]:
%sql
SELECT ln(`TOTALORDERAMOUNT()`+0.0001), CASE WHEN `TIMESINCEFIRSTSEENMAIL()` IS NOT NULL THEN ln(`TIMESINCEFIRSTSEENMAIL()`+0.0001) ELSE 0 END AS `L#TIMESINCEFIRSTSEENMAIL()` FROM training_data LIMIT 10

ln((TOTALORDERAMOUNT() + CAST(0.0001 AS DOUBLE))),L#TIMESINCEFIRSTSEENMAIL()
4.248496670619767,14.686819311857638
4.6605112271841165,14.18614909596346
3.511249872767616,10.307443963132329
3.9888006964233313,14.93931697924544
4.636669822039241,14.630036080794666
3.663564210228923,14.0879428529841
3.8066647119900727,11.368891379263845
3.71333057413269,13.958945049167298
3.912025005426146,14.500012495906462
4.007153169030527,14.5652779722226


### Converting a view table into a PySpark dataframe

In [0]:
orders_by_city = spark.sql('SELECT `SHIPPINGCITY()`, COUNT(DISTINCT order_id) AS num_orders FROM training_data GROUP BY `SHIPPINGCITY()` ORDER BY num_orders DESC')

print(f'Type of "orders_by_city": {type(orders_by_city)}.')
orders_by_city.take(3)

Type of "orders_by_city": <class 'pyspark.sql.dataframe.DataFrame'>.
Out[13]: [Row(SHIPPINGCITY()='São Paulo', num_orders=1039),
 Row(SHIPPINGCITY()='Rio de Janeiro', num_orders=358),
 Row(SHIPPINGCITY()='são paulo', num_orders=250)]

## Data pre-processing

### Assessing missing values

#### Training data

In [0]:
# Dataframe with frequency of missings by feature:
missings_df_train = df_train.select([func.count(func.when(func.col(c).isNull(), c)).alias(c) for c in df_train.columns])

# Converting the dataframe into pandas for ready use:
missings_df_train = missings_df_train.toPandas().T.reset_index(drop=False)
missings_df_train.columns = ['feature', 'missings']

# Share of observations with missing value for each feature:
num_obs_train = df_train.count()
missings_df_train['share'] = missings_df_train['missings'].apply(lambda x: x/num_obs_train)

# List of variables with missings:
vars_missings_train = list(missings_df_train[missings_df_train.missings > 0]['feature'])

print(f'Number of features with missings: {sum(missings_df_train["missings"] > 0)} ({np.nanmean(missings_df_train["missings"] > 0)*100:.2f}%).')
print(f'Average number of missings: {missings_df_train["missings"].mean():.0f} ({(missings_df_train["missings"].mean()/num_obs_train)*100:.2f}%).')
# missings_df_train.sample(10)

Number of features with missings: 291 (20.36%).
Average number of missings: 353 (5.59%).


#### Test data

In [0]:
# Dataframe with frequency of missings by feature:
missings_df_test = df_test.select([func.count(func.when(func.col(c).isNull(), c)).alias(c) for c in df_test.columns])

# Converting the dataframe into pandas for ready use:
missings_df_test = missings_df_test.toPandas().T.reset_index(drop=False)
missings_df_test.columns = ['feature', 'missings']

# Share of observations with missing value for each feature:
num_obs_test = df_test.count()
missings_df_test['share'] = missings_df_test['missings'].apply(lambda x: x/num_obs_test)

print(f'Number of features with missings: {sum(missings_df_test["missings"] > 0)} ({np.nanmean(missings_df_test["missings"] > 0)*100:.2f}%).')
print(f'Average number of missings: {missings_df_test["missings"].mean():.0f} ({(missings_df_test["missings"].mean()/num_obs_test)*100:.2f}%).')
# missings_df_test.sample(10)

Number of features with missings: 164 (11.48%).
Average number of missings: 102 (4.97%).


### Data types of features

In [0]:
# Lists with categorical and numerical variables:
cat_vars = [t[0] for t in df_train.dtypes if (t[1]=='string') & (t[0] not in drop_vars)]
num_vars = [t[0] for t in df_train.dtypes if (t[1]!='string') & (t[0] not in drop_vars)]

# Number of unique values for a sample of data:
unique_values_sample = df_train.select(num_vars).sample(fraction=0.05).toPandas().nunique()

# Name of numerical variables with sufficient variation for log transformation:
unique_values_sample = pd.DataFrame(data={
  'feature': unique_values_sample.index, 'num_uniques': unique_values_sample.values
})
to_log = list(unique_values_sample[unique_values_sample.num_uniques>100]['feature'])

### Logarithmic transformation

In [0]:
# Query clause for log transforming numerical data:
log_transf = "CASE WHEN `{feat}` IS NOT NULL THEN ln(`{feat}`+0.0001) ELSE 0 END AS `L#{feat}`"
log_transf = ', '.join([log_transf.format(feat=c) for c in to_log])

### Treating missing values

In [0]:
# Query clause for creating binary variable indicating missing values:
missing_vars = "CASE WHEN `{feat}` IS NULL THEN 1 ELSE 0 END AS `NA#{feat}`"
missing_vars = ', '.join([missing_vars.format(feat=c) for c in num_vars if c in vars_missings_train])

# Query clause to impute missing values of numerical variables (except from those that had already been treated during log transformation):
impute_missings_num = "CASE WHEN `{feat}` IS NULL THEN 0 ELSE `{feat}` END AS `{feat}`"
impute_missings_num = ', '.join([impute_missings_num.format(feat=c) for c in num_vars if c not in to_log])

# Query clause to impute missing values of categorical variables:
impute_missings_cat = "CASE WHEN `{feat}` IS NULL THEN 'missing_value' ELSE `{feat}` END AS `{feat}`"
impute_missings_cat = ', '.join([impute_missings_cat.format(feat=c) for c in cat_vars])

# Query clause with support variables:
drop_vars_query = ', '.join(drop_vars)

#### Training data

In [0]:
# Concatenating the query and running it to produce the transformed dataframe:
query = f'SELECT {drop_vars_query}, {log_transf}, {missing_vars}, {impute_missings_num}, {impute_missings_cat} FROM training_data'
df_train = spark.sql(query)

first_date_train = df_train.agg(func.min('datetime').alias('first_date'), func.max('datetime').alias('last_date')).collect()[0]['first_date']
last_date_train = df_train.agg(func.min('datetime').alias('first_date'), func.max('datetime').alias('last_date')).collect()[0]['last_date']

print(f'Shape of df_train: ({df_train.count()}, {len(df_train.columns)}).')
print(f'Number of unique instances (training data): {df_train.select("order_id").distinct().count()}.')
print(f'Time interval (training data): ({first_date_train}, {last_date_train}).')

# df_train.display()

Shape of df_train: (6313, 1717).
Number of unique instances (training data): 6313.
Time interval (training data): (2021-05-17 15:01:00, 2021-06-09 23:56:06).


#### Test data

In [0]:
# Concatenating the query and running it to produce the transformed dataframe:
query = f'SELECT {drop_vars_query}, {log_transf}, {missing_vars}, {impute_missings_num}, {impute_missings_cat} FROM test_data'
df_test = spark.sql(query)

first_date_test = df_test.agg(func.min('datetime').alias('first_date'), func.max('datetime').alias('last_date')).collect()[0]['first_date']
last_date_test = df_test.agg(func.min('datetime').alias('first_date'), func.max('datetime').alias('last_date')).collect()[0]['last_date']

print(f'Shape of df_test: ({df_test.count()}, {len(df_test.columns)}).')
print(f'Number of unique instances (test data): {df_test.select("order_id").distinct().count()}.')
print(f'Time interval (training data): ({first_date_test}, {last_date_test}).')

# df_test.display()

Shape of df_test: (2048, 1717).
Number of unique instances (test data): 2048.
Time interval (training data): (2021-06-10 00:01:08, 2021-06-27 23:55:01).
