In [None]:
## 설치
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#Check this site for the latest download link https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

[33m0% [Working][0m            Hit:1 http://security.ubuntu.com/ubuntu jammy-security InRelease
[33m0% [Connecting to archive.ubuntu.com] [Connected to cloud.r-project.org (108.15[0m                                                                               Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
[33m0% [Waiting for headers] [Waiting for headers] [Connected to ppa.launchpadconte[0m                                                                               Hit:3 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
[33m0% [Waiting for headers] [Connected to ppa.launchpadcontent.net (185.125.190.80[0m                                                                               Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
[33m                                                                               0% [Connected to ppa.launchpadcontent.net (185.125.190.80)][0m                 

In [None]:
## 초기화
import findspark
findspark.init()
findspark.find()

'/usr/local/lib/python3.10/dist-packages/pyspark'

In [None]:
import pandas as pd
import pyspark
import pyspark.sql.functions as f
from pyspark.sql import DataFrame, SparkSession, Window
from pyspark.sql.functions import spark_partition_id
from pyspark.sql.types import StructType, StructField, IntegerType, ShortType, StringType

In [None]:
def get_spark_session():
    ## Session 생성
    spark = (
        SparkSession
        .builder
        .appName("Commerce Session")
        .getOrCreate()
    )

    return spark

In [None]:
def extract_data(spark):

    struct_schema_order = StructType([
        StructField("order_id", IntegerType()),
        StructField("product_id", IntegerType()),
        StructField("add_to_cart_order", ShortType()),
        StructField("reordered", ShortType())
    ])


    struct_schema_product = StructType([
        StructField("product_id", IntegerType()),
        StructField("product_name", StringType()),
        StructField("aisle_id", IntegerType()),
        StructField("department_id", IntegerType())
    ])
    struct_schema_aisle = StructType([
        StructField("aisle_id", IntegerType()),
        StructField("aisle", StringType())
    ])
    struct_schema_depart = StructType([
        StructField("department_id", IntegerType()),
        StructField("department", StringType())
    ])

    df1 = spark.read.csv("/content/drive/MyDrive/order_products__prior.csv", header=True, schema=struct_schema_order)
    df2 = spark.read.csv("/content/drive/MyDrive/order_products__train.csv", header=True, schema=struct_schema_order)

    df = df1.union(df2)
    df = df.filter("order_id % 13 = 1")


    df_product = (
        spark.read.csv("/content/drive/MyDrive//products.csv", header=True, schema=struct_schema_product)
        .join(spark.read.csv("/content/drive/MyDrive/aisles.csv", header=True, schema=struct_schema_aisle), on="aisle_id", how="left")
        .join(spark.read.csv("/content/drive/MyDrive/departments.csv", header=True, schema=struct_schema_depart), on="department_id", how="left")
        .drop("aisle_id", "department_id")
    )

    df = df.join(df_product, on="product_id", how="left")

    return df

In [None]:
def calculate_last_order(df):
    df.persist() ## 아래에서 2번 재사용 하므로

    ## 다른 로직
    # df_last_order_short = df.orderBy(["order_id", "add_to_cart_order"]).groupBy("order_id").agg(f.last("add_to_cart_order")).toPandas()

    w = Window.partitionBy('order_id')
    df_last_order = df.withColumn('max_add_to_cart_order', f.max('add_to_cart_order').over(w))\
        .where(f.col('add_to_cart_order') == f.col('max_add_to_cart_order'))\
        .drop('max_add_to_cart_order')
    department_count = pd.concat([
        df_last_order.groupby("department").count().toPandas(),
        df.groupby("department").count().toPandas()
    ], axis=1)

    return department_count


In [None]:
%%time
def main():
    spark = get_spark_session()
    df = extract_data(spark)
    department_count = calculate_last_order(df)
    print(department_count.to_markdown())

if __name__ == "__main__":
    main()

|    | department      |   count | department      |   count |
|---:|:----------------|--------:|:----------------|--------:|
|  0 | meat seafood    |    5709 | beverages       |  215922 |
|  1 | beverages       |   25659 | meat seafood    |   56257 |
|  2 | frozen          |   18980 | frozen          |  180360 |
|  3 | deli            |    8513 | deli            |   83776 |
|  4 | dry goods pasta |    6662 | dry goods pasta |   70196 |
|  5 | bulk            |     335 | other           |    3033 |
|  6 | other           |     440 | babies          |   34000 |
|  7 | babies          |    2240 | bakery          |   94286 |
|  8 | bakery          |    9078 | produce         |  760789 |
|  9 | produce         |   66449 | pantry          |  150276 |
| 10 | pantry          |   17567 | dairy eggs      |  432076 |
| 11 | dairy eggs      |   36412 | canned goods    |   85785 |
| 12 | canned goods    |    7246 | personal care   |   36012 |
| 13 | personal care   |    5744 | breakfast       |   

In [None]:
# %%time
# df = df.orderBy(["order_id", "add_to_cart_order"])

# df_temp = df.select(*df.columns, f.lag("aisle").over(w.partitionBy(f.lit(1)).orderBy("order_id")).alias("aisle_before"))
# df_temp = df_temp.filter(f.col("aisle") != f.col("aisle_before"))
# df_temp = df_temp.filter(f.col("add_to_cart_order") != 1).toPandas()