In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").getOrCreate()

In [4]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *

In [5]:
df=spark.read.options(delimiter=',', inferSchema='True', header='True').csv("data/Invistico_Airline.csv")

In [6]:
df.show(5)

+------------+------+--------------+---+---------------+--------+---------------+------------+---------------------------------+--------------+-------------+---------------------+----------------------+--------------+----------------------+----------------+----------------+----------------+---------------+-----------+---------------+--------------------------+------------------------+
|satisfaction|Gender| Customer Type|Age| Type of Travel|   Class|Flight Distance|Seat comfort|Departure/Arrival time convenient|Food and drink|Gate location|Inflight wifi service|Inflight entertainment|Online support|Ease of Online booking|On-board service|Leg room service|Baggage handling|Checkin service|Cleanliness|Online boarding|Departure Delay in Minutes|Arrival Delay in Minutes|
+------------+------+--------------+---+---------------+--------+---------------+------------+---------------------------------+--------------+-------------+---------------------+----------------------+--------------+-------

In [7]:
df.groupBy('Class','Age').count().orderBy('age').show()

+--------+---+-----+
|   Class|Age|count|
+--------+---+-----+
|     Eco|  7|  532|
|Eco Plus|  7|   81|
|Business|  7|   72|
|     Eco|  8|  588|
|Business|  8|  106|
|Eco Plus|  8|  103|
|Business|  9|  128|
|     Eco|  9|  641|
|Eco Plus|  9|   90|
|     Eco| 10|  622|
|Business| 10|  111|
|Eco Plus| 10|   89|
|Eco Plus| 11|  100|
|Business| 11|  120|
|     Eco| 11|  617|
|Eco Plus| 12|   82|
|Business| 12|  130|
|     Eco| 12|  582|
|Eco Plus| 13|   75|
|     Eco| 13|  602|
+--------+---+-----+
only showing top 20 rows



In [8]:
def age_category(age):
    if age <= 25:
        return "Young"
    elif age <= 40:
        return "Mid aged"
    elif age <= 60:
        return "Old"
    else:
        return "Very Old"

In [9]:
age_udf = udf(age_category, StringType())

In [10]:
df = df.withColumn('age_category', age_udf(df['Age']))

In [11]:
df.select('Age', 'age_category').show()

+---+------------+
|Age|age_category|
+---+------------+
| 65|    Very Old|
| 47|         Old|
| 15|       Young|
| 60|         Old|
| 70|    Very Old|
| 30|    Mid aged|
| 66|    Very Old|
| 10|       Young|
| 56|         Old|
| 22|       Young|
| 58|         Old|
| 34|    Mid aged|
| 62|    Very Old|
| 35|    Mid aged|
| 47|         Old|
| 60|         Old|
| 13|       Young|
| 52|         Old|
| 55|         Old|
| 28|    Mid aged|
+---+------------+
only showing top 20 rows



In [12]:
df.groupBy('age_category').count().show()

+------------+-----+
|age_category|count|
+------------+-----+
|    Very Old|10054|
|         Old|52554|
|       Young|28173|
|    Mid aged|39099|
+------------+-----+



Pandas UDF

Pandas UDFs are much faster and efficient, in terms of processing and execution time, compared to standard Python UDFs. The main difference
between a normal Python UDF and a Pandas UDF is that a Python UDF is executed row by row and, therefore, really doesn’t offer the advantage of a distributed framework. It can take longer, compared to a Pandas UDF, which executes block by block and gives faster results.

There are three different types of Pandas UDFs: scalar, grouped map, and grouped agg.

In [13]:
df.select('Age').summary().show()

+-------+------------------+
|summary|               Age|
+-------+------------------+
|  count|            129880|
|   mean| 39.42795657530028|
| stddev|15.119359950371694|
|    min|                 7|
|    25%|                27|
|    50%|                40|
|    75%|                51|
|    max|                85|
+-------+------------------+



In [14]:
min_age = 7
max_age = 85

In [17]:
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType

In [16]:
def scaled_age(age):
    sc_age = (age - min_age) / (max_age - min_age)
    return sc_age

In [19]:
scaling_udf = pandas_udf(scaled_age, DoubleType())

In [20]:
df.withColumn("scaled_age", scaling_udf(df['Age'])).show(10, False)

+------------+------+--------------+---+---------------+--------+---------------+------------+---------------------------------+--------------+-------------+---------------------+----------------------+--------------+----------------------+----------------+----------------+----------------+---------------+-----------+---------------+--------------------------+------------------------+------------+--------------------+
|satisfaction|Gender|Customer Type |Age|Type of Travel |Class   |Flight Distance|Seat comfort|Departure/Arrival time convenient|Food and drink|Gate location|Inflight wifi service|Inflight entertainment|Online support|Ease of Online booking|On-board service|Leg room service|Baggage handling|Checkin service|Cleanliness|Online boarding|Departure Delay in Minutes|Arrival Delay in Minutes|age_category|scaled_age          |
+------------+------+--------------+---+---------------+--------+---------------+------------+---------------------------------+--------------+-------------