In [2]:
import os

In [3]:
from pyspark.sql import SparkSession

In [4]:
os.environ["PYSPARK_SUBMIT_ARGS"] = "--packages org.postgresql:postgresql:42.2.19 --driver-class-path /home/jovyan/.ivy2/jars/org.postgresql_postgresql-42.2.19.jar pyspark-shell"

In [5]:
spark = (
    SparkSession.builder.appName("Spark Notebook")
    .getOrCreate()
)

In [6]:
%load_ext jupyterlab_sql_editor.ipython_magic.sparksql

In [7]:
%config SparkSql.outputFile="/tmp/sparkdb.schema.json"

In [8]:
def load_postgres_to_spark_temp_view(
    spark, jdbc_url, table_name, user, password, temp_view_name
):
    """
    Loads a table from PostgreSQL into Spark and registers it as a temporary view.

    Args:
        spark (SparkSession): Active Spark session.
        jdbc_url (str): JDBC URL for PostgreSQL.
        table_name (str): Name of the table in PostgreSQL.
        user (str): PostgreSQL database user.
        password (str): PostgreSQL database password.
        temp_view_name (str): Name of the temporary view in Spark.

    Returns:
        None
    """
    df = (
        spark.read.format("jdbc")
        .options(
            url=jdbc_url,
            dbtable=table_name,
            user=user,
            password=password,
            driver="org.postgresql.Driver"
        )
        .load()
    )

    df.createOrReplaceTempView(temp_view_name)

In [9]:
table_to_load = ["managers", "orders", "returns"]
conn_params = {
    "jdbc_url": "jdbc:postgresql://poplin-postgres:5432/poplin-store",
    "user": "postgres",
    "password": "secretpassword",
}

for table in table_to_load:
    args = {
        **{"spark": spark, "table_name": table, "temp_view_name": table},
        **conn_params,
    }
    load_postgres_to_spark_temp_view(**args)

In [10]:
%sparksql --refresh all

Exporting functions: [########################################] 100.0%
Schema file updated: /tmp/sparkdb.schema.json


In [11]:
%sparksql SHOW TABLES

Execution time: 0.08 seconds


SparkSchemaWidget(nodes=(Node(close_icon='angle-down', close_icon_style='danger', icon='project-diagram', icon…

namespace,tableName,isTemporary
,managers,True
,orders,True
,returns,True


In [12]:
%%sparksql --output text --limit 20
select * from orders limit 20

Execution time: 0.20 seconds


SparkSchemaWidget(nodes=(Node(close_icon='angle-down', close_icon_style='danger', icon='project-diagram', icon…

+---+--------------+----------+----------+--------------+-----------+------------------+-----------+-------------+---------------+--------------+-----------+-------+---------------+---------------+------------+----------------------------------------------------------------------------+--------+--------+--------+--------+
|id |order_id      |order_date|ship_date |ship_mode     |customer_id|customer_name     |segment    |country      |city           |state         |postal_code|region |product_id     |category       |sub_category|product_name                                                                |sales   |quantity|discount|profit  |
+---+--------------+----------+----------+--------------+-----------+------------------+-----------+-------------+---------------+--------------+-----------+-------+---------------+---------------+------------+----------------------------------------------------------------------------+--------+--------+--------+--------+
|1  |CA-2016-152156|2016-11-

In [13]:
%%sparksql --output text --limit 20
select * from managers limit 20

Execution time: 0.06 seconds


SparkSchemaWidget(nodes=(Node(close_icon='angle-down', close_icon_style='danger', icon='project-diagram', icon…

+-----------------+-------+
|manager          |region |
+-----------------+-------+
|Anna Andreadi    |West   |
|Chuck Magee      |East   |
|Kelly Williams   |Central|
|Cassandra Brandow|South  |
+-----------------+-------+



In [14]:
%%sparksql --output text --limit 20
select * from returns limit 20

Execution time: 0.05 seconds


SparkSchemaWidget(nodes=(Node(close_icon='angle-down', close_icon_style='danger', icon='project-diagram', icon…

+--------+--------------+
|returned|order_id      |
+--------+--------------+
|Yes     |CA-2017-153822|
|Yes     |CA-2017-129707|
|Yes     |CA-2014-152345|
|Yes     |CA-2015-156440|
|Yes     |US-2017-155999|
|Yes     |CA-2014-157924|
|Yes     |CA-2017-131807|
|Yes     |CA-2016-124527|
|Yes     |CA-2017-135692|
|Yes     |CA-2014-123225|
|Yes     |CA-2017-145772|
|Yes     |US-2014-105137|
|Yes     |CA-2017-101805|
|Yes     |CA-2016-111682|
|Yes     |CA-2017-131492|
|Yes     |CA-2015-104129|
|Yes     |CA-2017-117926|
|Yes     |US-2016-115952|
|Yes     |CA-2015-155761|
|Yes     |CA-2017-100111|
+--------+--------------+



# QA

In [15]:
%%sparksql --output text --limit 20

-- Check for regions in orders that do not have a matching manager

select distinct o.region
from orders o
left join managers m on o.region = m.region
where m.region is null;

Execution time: 0.52 seconds


SparkSchemaWidget(nodes=(Node(close_icon='angle-down', close_icon_style='danger', icon='project-diagram', icon…

+------+
|region|
+------+
+------+



In [16]:
%%sparksql --output text --limit 20

-- Identify orders where the ship date is before the order date

select order_id, order_date, ship_date
from orders
where order_date > ship_date;

Execution time: 0.08 seconds


SparkSchemaWidget(nodes=(Node(close_icon='angle-down', close_icon_style='danger', icon='project-diagram', icon…

+--------+----------+---------+
|order_id|order_date|ship_date|
+--------+----------+---------+
+--------+----------+---------+



In [17]:
%%sparksql --output text --limit 20

-- Check for missing values in ship_mode, segment, or category

select *
from orders
where ship_mode is null or segment is null or category is null;

Execution time: 0.05 seconds


SparkSchemaWidget(nodes=(Node(close_icon='angle-down', close_icon_style='danger', icon='project-diagram', icon…

+---+--------+----------+---------+---------+-----------+-------------+-------+-------+----+-----+-----------+------+----------+--------+------------+------------+-----+--------+--------+------+
|id |order_id|order_date|ship_date|ship_mode|customer_id|customer_name|segment|country|city|state|postal_code|region|product_id|category|sub_category|product_name|sales|quantity|discount|profit|
+---+--------+----------+---------+---------+-----------+-------------+-------+-------+----+-----+-----------+------+----------+--------+------------+------------+-----+--------+--------+------+
+---+--------+----------+---------+---------+-----------+-------------+-------+-------+----+-----+-----------+------+----------+--------+------------+------------+-----+--------+--------+------+



In [18]:
%%sparksql --output text --limit 20

-- Check for invalid discount values

select order_id, discount
from orders
where discount < 0 or discount > 1;

Execution time: 0.06 seconds


SparkSchemaWidget(nodes=(Node(close_icon='angle-down', close_icon_style='danger', icon='project-diagram', icon…

+--------+--------+
|order_id|discount|
+--------+--------+
+--------+--------+



In [19]:
%%sparksql --output text --limit 20

-- Identify orders with order_id values that do not match the expected pattern 'XX-YYYY-ZZZZZ', 
-- where 'XX' represents two uppercase letters, 'YYYY' represents four digits, 
-- and 'ZZZZZ' represents a sequence of one or more digits.

select 
      trim(order_id) as order_id
    , length(trim(order_id)) as length
    , hex(trim(order_id)) as hex_representation
    , case 
        when trim(order_id) rlike '^[A-Z]{2}-\\d{4}-\\d+$' then 'valid'
        else 'invalid'
      end as order_id_status
from orders
where 
    trim(order_id) is not null
  and not (trim(order_id) rlike '^[A-Z]{2}-\\d{4}-\\d+$');

Execution time: 0.09 seconds


SparkSchemaWidget(nodes=(Node(close_icon='angle-down', close_icon_style='danger', icon='project-diagram', icon…

+--------+------+------------------+---------------+
|order_id|length|hex_representation|order_id_status|
+--------+------+------------------+---------------+
+--------+------+------------------+---------------+



In [20]:
%%sparksql --output text --limit 20

-- Identify rows with null values in key columns

select *
from orders
where order_id is null or customer_id is null or sales is null;

Execution time: 0.04 seconds


SparkSchemaWidget(nodes=(Node(close_icon='angle-down', close_icon_style='danger', icon='project-diagram', icon…

+---+--------+----------+---------+---------+-----------+-------------+-------+-------+----+-----+-----------+------+----------+--------+------------+------------+-----+--------+--------+------+
|id |order_id|order_date|ship_date|ship_mode|customer_id|customer_name|segment|country|city|state|postal_code|region|product_id|category|sub_category|product_name|sales|quantity|discount|profit|
+---+--------+----------+---------+---------+-----------+-------------+-------+-------+----+-----+-----------+------+----------+--------+------------+------------+-----+--------+--------+------+
+---+--------+----------+---------+---------+-----------+-------------+-------+-------+----+-----+-----------+------+----------+--------+------------+------------+-----+--------+--------+------+



In [21]:
%%sparksql --output text --limit 20

-- Check for customers with multiple associated names

select 
    customer_id
  , count(distinct customer_name) as name_variants
from orders
group by customer_id
having name_variants > 1;

Execution time: 0.59 seconds


SparkSchemaWidget(nodes=(Node(close_icon='angle-down', close_icon_style='danger', icon='project-diagram', icon…

+-----------+-------------+
|customer_id|name_variants|
+-----------+-------------+
+-----------+-------------+



In [22]:
%%sparksql --output text --limit 20

-- Check for orphan returns in the orders dataset

with returns_orders as (
    select lower(trim(order_id)) as order_id from returns group by 1
),
all_orders as (
    select lower(trim(order_id)) as order_id from orders group by 1
)
select ro.*
from 
    returns_orders ro
    left join all_orders ao 
    on ao.order_id=ro.order_id
where ro.order_id is null
;

Execution time: 0.09 seconds


SparkSchemaWidget(nodes=(Node(close_icon='angle-down', close_icon_style='danger', icon='project-diagram', icon…

+--------+
|order_id|
+--------+
+--------+



# Exploration

In [23]:
%%sparksql --output text --limit 20

-- Analyze total sales and profit by customer segment

select 
    segment
  , sum(sales) as total_sales
  , sum(profit) as total_profit
from orders
group by segment
order by total_sales desc;

Execution time: 0.17 seconds


SparkSchemaWidget(nodes=(Node(close_icon='angle-down', close_icon_style='danger', icon='project-diagram', icon…

+-----------+------------------+------------------+
|segment    |total_sales       |total_profit      |
+-----------+------------------+------------------+
|Consumer   |1161401.3449999888|134119.20919999972|
|Corporate  |706146.3668000001 |91979.13400000018 |
|Home Office|429653.1485000003 |60298.678500000075|
+-----------+------------------+------------------+



In [24]:
%%sparksql --output text --limit 20

-- Analyze total sales and profit product category

select 
    category
  , sum(sales) as total_sales
  , sum(profit) as total_profit
from orders
group by 1
order by 2 desc;

Execution time: 0.10 seconds


SparkSchemaWidget(nodes=(Node(close_icon='angle-down', close_icon_style='danger', icon='project-diagram', icon…

+---------------+-----------------+-----------------+
|category       |total_sales      |total_profit     |
+---------------+-----------------+-----------------+
|Technology     |836154.0329999966|145454.9480999999|
|Furniture      |741999.7952999998|18451.2728       |
|Office Supplies|719047.0320000029|122490.8008000001|
+---------------+-----------------+-----------------+



In [25]:
%%sparksql --output text --limit 20

-- Calculate return rate by customer segment

select 
    o.segment
  , count(distinct r.order_id) as returned_orders
  , count(distinct o.order_id) as total_orders
  , round((count(distinct r.order_id) * 100.0 / count(distinct o.order_id)),3) as return_rate
from orders o
    left join returns r on o.order_id = r.order_id
group by 1
order by 4 desc;

Execution time: 0.66 seconds


SparkSchemaWidget(nodes=(Node(close_icon='angle-down', close_icon_style='danger', icon='project-diagram', icon…

+-----------+---------------+------------+-----------+
|segment    |returned_orders|total_orders|return_rate|
+-----------+---------------+------------+-----------+
|Corporate  |93             |1514        |6.143      |
|Consumer   |154            |2586        |5.955      |
|Home Office|49             |909         |5.391      |
+-----------+---------------+------------+-----------+



In [26]:
%%sparksql --output text --limit 20

-- Calculate return rate by product category

select 
    o.category
  , count(distinct r.order_id) as returned_orders
  , count(distinct o.order_id) as total_orders
  , round((count(distinct r.order_id) * 100.0 / count(distinct o.order_id)),3) as return_rate
from orders o
    left join returns r on o.order_id = r.order_id
group by o.category
order by return_rate desc;

Execution time: 0.52 seconds


SparkSchemaWidget(nodes=(Node(close_icon='angle-down', close_icon_style='danger', icon='project-diagram', icon…

+---------------+---------------+------------+-----------+
|category       |returned_orders|total_orders|return_rate|
+---------------+---------------+------------+-----------+
|Technology     |123            |1544        |7.966      |
|Furniture      |136            |1764        |7.710      |
|Office Supplies|234            |3742        |6.253      |
+---------------+---------------+------------+-----------+



In [27]:
%%sparksql --output text --limit 20

-- Calculate the distribution of orders per customer_id

select 
    customer_id,
    count(order_id) as total_orders
from orders
group by customer_id
order by total_orders desc;


Execution time: 0.24 seconds
Only showing top 20 rows


SparkSchemaWidget(nodes=(Node(close_icon='angle-down', close_icon_style='danger', icon='project-diagram', icon…

+-----------+------------+
|customer_id|total_orders|
+-----------+------------+
|WB-21850   |37          |
|JL-15835   |34          |
|PP-18955   |34          |
|MA-17560   |34          |
|JD-15895   |32          |
|CK-12205   |32          |
|SV-20365   |32          |
|EH-13765   |32          |
|AP-10915   |31          |
|EP-13915   |31          |
|ZC-21910   |31          |
|LC-16870   |30          |
|Dp-13240   |29          |
|SH-19975   |29          |
|GT-14710   |29          |
|BM-11650   |29          |
|KL-16645   |29          |
|CL-12565   |28          |
|XP-21865   |28          |
|KM-16720   |28          |
+-----------+------------+



In [28]:
%%sparksql --output text --limit 20

-- Analyze the number of orders and total sales by region

select 
    region
  , count(distinct order_id) as num_orders
  , round(sum(sales),2) as total_sales
from orders
group by region
order by num_orders desc;

Execution time: 0.34 seconds


SparkSchemaWidget(nodes=(Node(close_icon='angle-down', close_icon_style='danger', icon='project-diagram', icon…

+-------+----------+-----------+
|region |num_orders|total_sales|
+-------+----------+-----------+
|West   |1611      |725457.82  |
|East   |1401      |678781.24  |
|Central|1175      |501239.89  |
|South  |822       |391721.91  |
+-------+----------+-----------+



In [29]:
spark.stop()