In [1]:
from shared import extract, clean_data, get_spark, transform_data, REFRESH_MODE

etl_mode = REFRESH_MODE

df = extract(etl_mode)

INFO:root:Started the EXTRACT phase with etl_mode='refresh'
INFO:root:Skipping downloading and unzipping as they are time-consuming
INFO:root:Converting excel to csv with pandas
INFO:root:Successfully converted file csv_path='data/base/online_retail_2024-10-16-19-55-34.csv'
INFO:root:Finished the EXTRACT phase; Got a df with 541909 records 



In [2]:
df = clean_data(df)

In [3]:
spark = get_spark()
df.createOrReplaceTempView("df")
df.show()

+----------+----------+--------------------+--------+-------------------+----------+-----------+--------------+
|invoice_no|stock_code|         description|quantity|       invoice_date|unit_price|customer_id|       country|
+----------+----------+--------------------+--------+-------------------+----------+-----------+--------------+
|    536365|     21730|GLASS STAR FROSTE...|       6|2010-12-01 08:26:00|      4.25|      17850|United Kingdom|
|    536365|     71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|      3.39|      17850|United Kingdom|
|    536366|     22632|HAND WARMER RED P...|       6|2010-12-01 08:28:00|      1.85|      17850|United Kingdom|
|    536367|     22622|BOX OF VINTAGE AL...|       2|2010-12-01 08:34:00|      9.95|      13047|United Kingdom|
|    536367|     22745|POPPY'S PLAYHOUSE...|       6|2010-12-01 08:34:00|       2.1|      13047|United Kingdom|
|    536367|     22748|POPPY'S PLAYHOUSE...|       6|2010-12-01 08:34:00|       2.1|      13047|United K

In [4]:
# 1.2.a)
spark.sql("""
select
    customer_id,
    ROUND(SUM(quantity * unit_price), 2) as total_revenue
from df
group by customer_id
order by total_revenue desc
limit 50
""").show()

+-----------+-------------+
|customer_id|total_revenue|
+-----------+-------------+
|      14646|    280206.02|
|      18102|     259657.3|
|      17450|    194390.79|
|      16446|     168472.5|
|      14911|    143711.17|
|      12415|    124914.53|
|      14156|    117210.08|
|      17511|     91062.38|
|      16029|     80850.84|
|      12346|      77183.6|
|      16684|     66653.56|
|      14096|     65164.79|
|      13694|     65039.62|
|      15311|     60632.75|
|      13089|     58762.08|
|      17949|     58510.48|
|      15769|     56252.72|
|      15061|     54534.14|
|      14298|      51527.3|
|      14088|     50491.81|
+-----------+-------------+
only showing top 20 rows



In [5]:
# 1.2.b)
spark.sql("""
with country_rank as (
select
    country,
    stock_code,
    SUM(quantity) as quantity_sum,
    ROW_NUMBER() OVER (PARTITION BY country ORDER BY SUM(quantity) DESC) as rn
from df
group by country, stock_code
)
select * from country_rank
where rn = 1
order by quantity_sum desc;
""").show(40)

+--------------------+----------+------------+---+
|             country|stock_code|quantity_sum| rn|
+--------------------+----------+------------+---+
|      United Kingdom|     23843|       80995|  1|
|         Netherlands|     23084|        4801|  1|
|              France|     23084|        4000|  1|
|               Japan|     23084|        3408|  1|
|           Australia|     22492|        2952|  1|
|              Sweden|     22492|        2916|  1|
|                EIRE|     22197|        1828|  1|
|             Germany|     22326|        1221|  1|
|               Spain|    84997D|        1089|  1|
|         Switzerland|     22554|         636|  1|
|              Norway|     16008|         576|  1|
|             Finland|    84997D|         552|  1|
|              Canada|     37370|         504|  1|
|             Belgium|     21212|         480|  1|
|     Channel Islands|     21785|         407|  1|
|              Cyprus|     22335|         384|  1|
|           Singapore|     2233

In [6]:
# 1.2.c)
spark.sql("""
select
    YEAR(invoice_date) as year,
    MONTH(invoice_date) as month,
    ROUND(SUM(quantity * unit_price), 2) as total_revenue
    -- min(invoice_date), max(invoice_date)
from df
group by year, month
order by year, month
""").show(40)

+----+-----+-------------+
|year|month|total_revenue|
+----+-----+-------------+
|2010|   12|    570422.73|
|2011|    1|    568101.31|
|2011|    2|    446084.92|
|2011|    3|    594081.76|
|2011|    4|    468374.33|
|2011|    5|     677340.3|
|2011|    6|    660046.05|
|2011|    7|     598962.9|
|2011|    8|    644051.04|
|2011|    9|     950686.3|
|2011|   10|   1035642.45|
|2011|   11|   1155668.61|
|2011|   12|    517190.44|
+----+-----+-------------+



In [7]:
df = transform_data(df)
df.createOrReplaceTempView("df")

In [8]:
# 2.1.c)
spark.sql("""
select
    customer_id,
    order_year,
    order_month,
    ROUND(SUM(total_amount), 2) as monthly_total
from df
group by customer_id, order_year, order_month
""").show()

+-----------+----------+-----------+-------------+
|customer_id|order_year|order_month|monthly_total|
+-----------+----------+-----------+-------------+
|      13001|      2011|          2|      2098.95|
|      15529|      2011|          4|       198.91|
|      17706|      2010|         12|       477.83|
|      12621|      2010|         12|      1383.65|
|      16843|      2011|          2|       1240.4|
|      13854|      2011|          3|       927.18|
|      15159|      2010|         12|      1860.98|
|      13873|      2011|          2|       121.45|
|      14709|      2010|         12|        314.1|
|      16767|      2011|          1|      1866.02|
|      14897|      2011|          2|        175.5|
|      12754|      2011|          3|        16.05|
|      13327|      2010|         12|        745.0|
|      17961|      2011|          3|        64.82|
|      16764|      2011|          8|       172.42|
|      14442|      2011|          5|       914.24|
|      13225|      2011|       