<a href="https://colab.research.google.com/github/ipw0630/project_etl/blob/main/Assignment_14_py.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
! pip install pyspark



# Extract Data Set

In [2]:
! wget 'https://raw.githubusercontent.com/ipw0630/project_etl/main/raw_data/calendar.csv'

--2024-05-21 15:39:39--  https://raw.githubusercontent.com/ipw0630/project_etl/main/raw_data/calendar.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 124247 (121K) [text/plain]
Saving to: ‘calendar.csv.3’


2024-05-21 15:39:39 (1.40 MB/s) - ‘calendar.csv.3’ saved [124247/124247]



In [3]:
! wget 'https://raw.githubusercontent.com/ipw0630/project_etl/main/raw_data/customer_flight_activity.csv'

--2024-05-21 15:39:39--  https://raw.githubusercontent.com/ipw0630/project_etl/main/raw_data/customer_flight_activity.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.111.133, 185.199.108.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 14101562 (13M) [text/plain]
Saving to: ‘customer_flight_activity.csv.3’


2024-05-21 15:39:40 (36.7 MB/s) - ‘customer_flight_activity.csv.3’ saved [14101562/14101562]



In [4]:
! wget 'https://raw.githubusercontent.com/ipw0630/project_etl/main/raw_data/customer_loyalty_history.csv'

--2024-05-21 15:39:40--  https://raw.githubusercontent.com/ipw0630/project_etl/main/raw_data/customer_loyalty_history.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.110.133, 185.199.108.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1626948 (1.6M) [text/plain]
Saving to: ‘customer_loyalty_history.csv.3’


2024-05-21 15:39:41 (7.09 MB/s) - ‘customer_loyalty_history.csv.3’ saved [1626948/1626948]



In [5]:
from pyspark.sql import SparkSession

In [6]:
spark_session = SparkSession.builder.appName('assignment_14_dibimbing').master('local').getOrCreate()

# Cleaning & Transformation Data

In [7]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
schema = StructType([
    StructField('co', IntegerType(), True),
    StructField('date', DateType(), True),
    StructField('start_of_the_year', DateType()),
    StructField('start_of_the_quarter', DateType()),
    StructField('start_of_the_month', DateType()),
])
df1 = spark_session.read.csv('/content/calendar.csv', header=True, schema=schema)
df1.show(5)

+---+----------+-----------------+--------------------+------------------+
| co|      date|start_of_the_year|start_of_the_quarter|start_of_the_month|
+---+----------+-----------------+--------------------+------------------+
|  0|2012-01-01|       2012-01-01|          2012-01-01|        2012-01-01|
|  1|2012-01-02|       2012-01-01|          2012-01-01|        2012-01-01|
|  2|2012-01-03|       2012-01-01|          2012-01-01|        2012-01-01|
|  3|2012-01-04|       2012-01-01|          2012-01-01|        2012-01-01|
|  4|2012-01-05|       2012-01-01|          2012-01-01|        2012-01-01|
+---+----------+-----------------+--------------------+------------------+
only showing top 5 rows



In [8]:
# Define the schema and load the DataFrame
from pyspark.sql.types import *
from pyspark.sql.functions import col, concat, lit
schema = StructType([
    StructField('co', IntegerType()),
    StructField('loyalty_number', StringType()),
    StructField('year', StringType()),
    StructField('month', StringType()),
    StructField('total_flights', IntegerType()),
    StructField('points_accumulated', IntegerType()),
    StructField('points_redeemed', DoubleType()),
    StructField('dollar_cost_points_redeemed', IntegerType()),
])
df2 = spark_session.read.csv('/content/customer_flight_activity.csv', header=True, schema=schema)
df2.show(5)

+---+--------------+----+-----+-------------+------------------+---------------+---------------------------+
| co|loyalty_number|year|month|total_flights|points_accumulated|points_redeemed|dollar_cost_points_redeemed|
+---+--------------+----+-----+-------------+------------------+---------------+---------------------------+
|  0|        100590|2018|    6|           12|             15276|        22914.0|                          0|
|  1|        100590|2018|    7|           12|              9168|        13752.0|                          0|
|  2|        100590|2018|    5|            4|              6504|         9756.0|                          0|
|  3|        100590|2018|   10|            0|                 0|            0.0|                        512|
|  4|        100590|2018|    2|            0|                 0|            0.0|                          0|
+---+--------------+----+-----+-------------+------------------+---------------+---------------------------+
only showing top 5 

In [9]:
# Data Cleaning and Transformation (without filtering)

df2 = df2.fillna({
    'co': 0,
    'loyalty_number': 'UNKNOWN',
    'year': 'UNKNOWN',
    'month': 'UNKNOWN',
    'total_flights': 0,
    'points_accumulated': 0,
    'points_redeemed': 0.0,
    'dollar_cost_points_redeemed': 0
})

In [10]:
# Convert 'year' and 'month' to integers

df2 = df2.withColumn('year', col('year').cast(IntegerType()))
df2 = df2.withColumn('month', col('month').cast(IntegerType()))

In [11]:
# Drop duplicate rows based on specific columns
df2 = df2.dropDuplicates(['loyalty_number', 'year', 'month'])

In [12]:
df2.show(5)

+---+--------------+----+-----+-------------+------------------+---------------+---------------------------+
| co|loyalty_number|year|month|total_flights|points_accumulated|points_redeemed|dollar_cost_points_redeemed|
+---+--------------+----+-----+-------------+------------------+---------------+---------------------------+
| 31|        102788|2018|    4|            0|                 0|            0.0|                          0|
|446|        135257|2018|    2|            0|                 0|            0.0|                          0|
|449|        135257|2018|    9|            0|                 0|            0.0|                          0|
|621|        148289|2018|    2|            0|                 0|            0.0|                          0|
|771|        154868|2018|    6|           16|             14176|        21264.0|                          0|
+---+--------------+----+-----+-------------+------------------+---------------+---------------------------+
only showing top 5 

In [13]:
# Create a new column 'year_month'
df2 = df2.withColumn('year_month', concat(col('year'), lit('-'), col('month')))

In [14]:
# Show Data Frame (5)
df2.show(5)

+---+--------------+----+-----+-------------+------------------+---------------+---------------------------+----------+
| co|loyalty_number|year|month|total_flights|points_accumulated|points_redeemed|dollar_cost_points_redeemed|year_month|
+---+--------------+----+-----+-------------+------------------+---------------+---------------------------+----------+
| 31|        102788|2018|    4|            0|                 0|            0.0|                          0|    2018-4|
|446|        135257|2018|    2|            0|                 0|            0.0|                          0|    2018-2|
|449|        135257|2018|    9|            0|                 0|            0.0|                          0|    2018-9|
|621|        148289|2018|    2|            0|                 0|            0.0|                          0|    2018-2|
|771|        154868|2018|    6|           16|             14176|        21264.0|                          0|    2018-6|
+---+--------------+----+-----+---------

# VISUALIZATION

In [15]:
# Convert PySpark DataFrame to Pandas DataFrame

pdf = df2.toPandas()

In [16]:
import plotly.express as px

In [17]:
# Use Plotly to create visualizations

fig = px.pie(pdf, values='total_flights', names='year_month', title='Total Flights by Year and Month')

# Define layout (optional)
fig.update_layout(
    font=dict(
        family="Arial",
        size=12,
        color="RebeccaPurple"
    )
)

In [18]:
!pip install -U kaleido



In [19]:
# Save the figure as an image
fig.write_image("Total Flights by Year and Month.png")

In [20]:
from pyspark.sql.types import *
from pyspark.sql.functions import col, lit
import pandas as pd
schema = StructType([
    StructField('co', IntegerType()),
    StructField('loyalty_number', IntegerType()),
    StructField('country', StringType()),
    StructField('province', StringType()),
    StructField('city', StringType()),
    StructField('postal_code', StringType()),
    StructField('gender', StringType()),
    StructField('education', StringType()),
    StructField('salary', DoubleType()),
    StructField('marital_status', StringType()),
    StructField('loyalty_card', StringType()),
    StructField('customer_lifetime_value', StringType()),
    StructField('enrollment_type', StringType()),
    StructField('enrollment_year', StringType()),
    StructField('enrollment_month', StringType()),
    StructField('cancellation_month', DoubleType()),
    StructField('cancellation_year', StringType()),

])
df3 = spark_session.read.csv('/content/customer_loyalty_history.csv', header=True, schema=schema)
df3.show(5)

+---+--------------+-------+----------------+---------+-----------+------+---------+--------+--------------+------------+-----------------------+---------------+---------------+----------------+------------------+-----------------+
| co|loyalty_number|country|        province|     city|postal_code|gender|education|  salary|marital_status|loyalty_card|customer_lifetime_value|enrollment_type|enrollment_year|enrollment_month|cancellation_month|cancellation_year|
+---+--------------+-------+----------------+---------+-----------+------+---------+--------+--------------+------------+-----------------------+---------------+---------------+----------------+------------------+-----------------+
|  0|        480934| Canada|         Ontario|  Toronto|    M2Z 4K1|Female| Bachelor| 83236.0|       Married|        Star|                   NULL|       Standard|           NULL|            NULL|              NULL|             NULL|
|  1|        549612| Canada|         Alberta| Edmonton|    T3G 6Y6|  Mal

In [21]:
# Fill missing values with default values
df3 = df3.fillna({
    'co': 0,
    'loyalty_number': 0,
    'country': 'UNKNOWN',
    'province': 'UNKNOWN',
    'city': 'UNKNOWN',
    'postal_code': 'UNKNOWN',
    'gender': 'UNKNOWN',
    'education': 'UNKNOWN',
    'salary': 0.0,
    'marital_status': 'UNKNOWN',
    'loyalty_card': 'UNKNOWN',
    'customer_lifetime_value': '0',
    'enrollment_type': 'UNKNOWN',
    'enrollment_year': 'UNKNOWN',
    'enrollment_month': 'UNKNOWN',
    'cancellation_month': 0.0,
    'cancellation_year': 'UNKNOWN',
})

In [22]:
# Convert columns to appropriate data types
df3 = df3.withColumn('enrollment_year', col('enrollment_year').cast(IntegerType()))
df3 = df3.withColumn('enrollment_month', col('enrollment_month').cast(IntegerType()))
df3 = df3.withColumn('cancellation_year', col('cancellation_year').cast(IntegerType()))


In [23]:
# Drop duplicate rows based on unique identifier columns
df3 = df3.dropDuplicates(['loyalty_number', 'enrollment_year', 'enrollment_month'])


In [24]:
# Create a new column 'enrollment_date' from 'enrollment_year' and 'enrollment_month'
df3 = df3.withColumn('enrollment_date', concat(col('enrollment_year'), lit('-'), col('enrollment_month')))


In [25]:
# Create a new column 'cancellation_date' from 'cancellation_year' and 'cancellation_month'
df3 = df3.withColumn('cancellation_date', concat(col('cancellation_year'), lit('-'), col('cancellation_month')))
# Show the cleaned and transformed DataFrame
df3.show(5)

+-----+--------------+-------+----------------+------------+-----------+------+---------+-------+--------------+------------+-----------------------+---------------+---------------+----------------+------------------+-----------------+---------------+-----------------+
|   co|loyalty_number|country|        province|        city|postal_code|gender|education| salary|marital_status|loyalty_card|customer_lifetime_value|enrollment_type|enrollment_year|enrollment_month|cancellation_month|cancellation_year|enrollment_date|cancellation_date|
+-----+--------------+-------+----------------+------------+-----------+------+---------+-------+--------------+------------+-----------------------+---------------+---------------+----------------+------------------+-----------------+---------------+-----------------+
| 2320|        100018| Canada|         Alberta|    Edmonton|    T9G 1W3|Female| Bachelor|92552.0|       Married|      Aurora|                      0|       Standard|           NULL|         

# VISUALIZATION

In [26]:
# Convert PySpark DataFrame to Pandas DataFrame
pdf = df3.toPandas()

In [35]:
# Create a box plot
fig_box = px.box(pdf, x='gender', y='salary', title='Salary Distribution by Gender',
                 labels={'gender': 'Gender', 'salary': 'Salary'})

# Show the box plot
fig_box.show()

# Save the box plot as an image
fig_box.write_image("salary_distribution_by_gender_box.png")


In [28]:
fig_pie = px.pie(pdf, values='loyalty_number', names='marital_status', title='Customer Distribution by Marital Status')
fig_pie.show()
fig_pie.write_image("customer_distribution_by_marital_status.png")