In [1]:
import os
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from pyspark.sql import functions as F
from pyspark.sql import Window

#### Get current working directory

In [2]:
cwd = os.getcwd()

cwd

'C:\\Users\\randy\\Desktop\\Personal\\github\\demo_pyspark_sql'

#### Initialise spark

In [3]:
spark = SparkSession.builder.getOrCreate()

spark

#### Read local csv into spark

In [4]:
df = spark.read.csv(
    os.path.join(
        'file://',
        cwd,
        'bank.csv'
    ),
    header=True
)

#### Analyse schema

In [5]:
df.printSchema()

root
 |-- age: string (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: string (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: string (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- campaign: string (nullable = true)
 |-- pdays: string (nullable = true)
 |-- previous: string (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)



#### Count records

In [6]:
df.count()

11162

#### Sample records

In [7]:
df.limit(3).toPandas()

Unnamed: 0,age,job,marital,education,default,balance,housing,loan,contact,day,month,duration,campaign,pdays,previous,poutcome,deposit
0,59,admin.,married,secondary,no,2343,yes,no,unknown,5,may,1042,1,-1,0,unknown,yes
1,56,admin.,married,secondary,no,45,no,no,unknown,5,may,1467,1,-1,0,unknown,yes
2,41,technician,married,secondary,no,1270,yes,no,unknown,5,may,1389,1,-1,0,unknown,yes


#### Get distinct value of each column

In [8]:
col_distinct_val = {
    col: None for col in df.columns
}

for col in col_distinct_val.keys():

    print('Analysing', col)
    col_distinct_val[col] = [i[col] for i in df.select(col).distinct().collect()]

Analysing age
Analysing job
Analysing marital
Analysing education
Analysing default
Analysing balance
Analysing housing
Analysing loan
Analysing contact
Analysing day
Analysing month
Analysing duration
Analysing campaign
Analysing pdays
Analysing previous
Analysing poutcome
Analysing deposit


#### Identify categorical and numerical columns

In [9]:
def is_number(value):
    
    try:
        float(value)
        return True
    except Exception:
        return False

numerical_cols = []
categorical_cols = []

for col, values in col_distinct_val.items():

    print('Analysing', col)
    test = {is_number(value) for value in values}

    if test == {True}:
        numerical_cols.append(col)
    else:
        categorical_cols.append(col)

Analysing age
Analysing job
Analysing marital
Analysing education
Analysing default
Analysing balance
Analysing housing
Analysing loan
Analysing contact
Analysing day
Analysing month
Analysing duration
Analysing campaign
Analysing pdays
Analysing previous
Analysing poutcome
Analysing deposit


In [10]:
print('Numerical:', numerical_cols, '\n')
print('Categorical:', categorical_cols)

Numerical: ['age', 'balance', 'day', 'duration', 'campaign', 'pdays', 'previous'] 

Categorical: ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'month', 'poutcome', 'deposit']


#### Convert numerical column type to numerical

In [11]:
for col in numerical_cols:

    df = df.withColumn(
        col,
        df[col].cast(DoubleType())
    )
    
df.printSchema()

root
 |-- age: double (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: double (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: double (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- campaign: double (nullable = true)
 |-- pdays: double (nullable = true)
 |-- previous: double (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)



#### Summary of numerical columns

In [12]:
df.select(numerical_cols).summary().toPandas()

Unnamed: 0,summary,age,balance,day,duration,campaign,pdays,previous
0,count,11162.0,11162.0,11162.0,11162.0,11162.0,11162.0,11162.0
1,mean,41.2319476796273,1528.5385235620856,15.658036194230425,371.9938183121304,2.508421429851281,51.33040673714388,0.8325568894463358
2,stddev,11.913369192215518,3225.413325946149,8.420739541006462,347.12838571630687,2.7220771816614824,108.75828197197715,2.292007218670508
3,min,18.0,-6847.0,1.0,2.0,1.0,-1.0,0.0
4,25%,32.0,122.0,8.0,138.0,1.0,-1.0,0.0
5,50%,39.0,550.0,15.0,255.0,2.0,-1.0,0.0
6,75%,49.0,1708.0,22.0,496.0,3.0,20.0,1.0
7,max,95.0,81204.0,31.0,3881.0,63.0,854.0,58.0


#### Summary of categorical columns

In [13]:
for col in categorical_cols:

    print(col)
    print('>>', col_distinct_val[col], '\n')

job
>> ['management', 'retired', 'unknown', 'self-employed', 'student', 'blue-collar', 'entrepreneur', 'admin.', 'technician', 'services', 'housemaid', 'unemployed'] 

marital
>> ['divorced', 'married', 'single'] 

education
>> ['unknown', 'tertiary', 'secondary', 'primary'] 

default
>> ['no', 'yes'] 

housing
>> ['no', 'yes'] 

loan
>> ['no', 'yes'] 

contact
>> ['unknown', 'cellular', 'telephone'] 

month
>> ['jun', 'aug', 'may', 'feb', 'sep', 'mar', 'oct', 'jul', 'nov', 'apr', 'dec', 'jan'] 

poutcome
>> ['success', 'unknown', 'other', 'failure'] 

deposit
>> ['no', 'yes'] 



#### Standardise value of existing column

In [14]:
df = df.withColumn(
    'poutcome',
    F.when(
        F.col('poutcome') == 'other',
        F.lit('unknown')
    ).otherwise(
        F.col('poutcome')
    )
)

df.select('poutcome').distinct().toPandas()

Unnamed: 0,poutcome
0,success
1,unknown
2,failure


#### Balances summary by education and marital

In [15]:
df.groupby(
    'education',
    'marital'
).agg(
    F.count('job').alias('tot_population'),
    F.sum('balance').alias('tot_balance'),
    F.round(F.avg('balance'), 2).alias('avg_balance'),
    F.min('balance').alias('min_balance'),
    F.max('balance').alias('max_balance')
).orderBy(
    F.asc('education'),
    F.desc('marital')
).toPandas()

Unnamed: 0,education,marital,tot_population,tot_balance,avg_balance,min_balance,max_balance
0,primary,single,197,271800.0,1379.7,-887.0,26965.0
1,primary,married,1099,1723931.0,1568.64,-1489.0,66653.0
2,primary,divorced,204,288816.0,1415.76,-779.0,37127.0
3,secondary,single,1704,1942320.0,1139.86,-1139.0,56831.0
4,secondary,married,3120,4500996.0,1442.63,-1965.0,81204.0
5,secondary,divorced,652,656210.0,1006.46,-934.0,12039.0
6,tertiary,single,1460,2637784.0,1806.7,-880.0,36252.0
7,tertiary,married,1843,3433041.0,1862.75,-6847.0,51439.0
8,tertiary,divorced,386,738586.0,1913.44,-2282.0,52587.0
9,unknown,single,157,274720.0,1749.81,-461.0,45248.0


#### Top 3 balance by marital

In [16]:
df.withColumn(
    'rank',
    F.dense_rank().over(
        Window.partitionBy(
            'marital'
        ).orderBy(
            F.desc('balance')
        )
    )
).filter(
    F.col('rank') < 4
).orderBy(
    F.desc('marital')
).select(
    'marital',
    'job',
    'age',
    'balance',
    'rank'
).toPandas()

Unnamed: 0,marital,job,age,balance,rank
0,single,admin.,43.0,56831.0,1
1,single,technician,39.0,45248.0,2
2,single,management,29.0,36252.0,3
3,married,retired,84.0,81204.0,1
4,married,retired,84.0,81204.0,1
5,married,blue-collar,52.0,66653.0,2
6,married,entrepreneur,56.0,51439.0,3
7,divorced,self-employed,61.0,52587.0,1
8,divorced,self-employed,61.0,52587.0,1
9,divorced,retired,75.0,37127.0,2


#### Create new column for negative balance check

In [17]:
df = df.withColumn(
    'negative_balance',
    F.when(
        F.col('balance') < 0,
        1
    ).otherwise(
        0
    )
)

df.printSchema()

root
 |-- age: double (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: double (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: double (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- campaign: double (nullable = true)
 |-- pdays: double (nullable = true)
 |-- previous: double (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)
 |-- negative_balance: integer (nullable = false)



#### Check if negative balance is a one-off or common occurrence

In [18]:
df.groupby(
    'negative_balance'
).count().toPandas()

Unnamed: 0,negative_balance,count
0,1,688
1,0,10474


#### Rename created column

In [19]:
df = df.withColumnRenamed(
    'negative_balance',
    'neg_bal'
)

df.printSchema()

root
 |-- age: double (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: double (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: double (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- campaign: double (nullable = true)
 |-- pdays: double (nullable = true)
 |-- previous: double (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)
 |-- neg_bal: integer (nullable = false)



#### Drop created column

In [20]:
df = df.drop('neg_bal')

df.printSchema()

root
 |-- age: double (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: double (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: double (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- campaign: double (nullable = true)
 |-- pdays: double (nullable = true)
 |-- previous: double (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)



#### Create new dataframe

In [21]:
month_dict = [
    {'month': 'jan', 'month_num': 1},
    {'month': 'feb', 'month_num': 2},
    {'month': 'mar', 'month_num': 3},
    {'month': 'apr', 'month_num': 4},
    {'month': 'may', 'month_num': 5},
    {'month': 'jun', 'month_num': 6},
    {'month': 'jul', 'month_num': 7},
    {'month': 'aug', 'month_num': 8},
    {'month': 'sep', 'month_num': 9},
    {'month': 'oct', 'month_num': 10},
    {'month': 'nov', 'month_num': 11},
    {'month': 'dec', 'month_num': 12}
]

month_df = spark.createDataFrame(
    pd.DataFrame(month_dict)
)

month_df.toPandas()

Unnamed: 0,month,month_num
0,jan,1
1,feb,2
2,mar,3
3,apr,4
4,may,5
5,jun,6
6,jul,7
7,aug,8
8,sep,9
9,oct,10


#### Join dataframes

In [22]:
df = df.join(
    month_df,
    on='month',
    how='inner'
)

df.limit(3).toPandas()

Unnamed: 0,month,age,job,marital,education,default,balance,housing,loan,contact,day,duration,campaign,pdays,previous,poutcome,deposit,month_num
0,jan,57.0,unemployed,married,primary,no,2743.0,no,no,cellular,29.0,89.0,1.0,-1.0,0.0,unknown,no,1
1,jan,34.0,services,single,secondary,no,2257.0,no,yes,telephone,30.0,568.0,2.0,-1.0,0.0,unknown,no,1
2,jan,34.0,blue-collar,married,primary,no,6718.0,no,no,cellular,13.0,278.0,4.0,97.0,1.0,unknown,no,1


#### Union dataframes

In [23]:
df = df.union(df)

df.count()

22324

#### Deduplicate dataframe

In [24]:
df = df.dropDuplicates()

df.count()

11162

#### Stop spark

In [25]:
spark.stop()