In [None]:
## 설치
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#Check this site for the latest download link https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

Hit:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:4 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:6 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
40 packages can be upgraded. Run 'apt list --upgradable' to see them.
tar: spark-3.2.1-bin-hadoop3.2.tgz: Cannot open: No such file or directory
tar: Error is not recov

In [None]:
## 초기화
import findspark
findspark.init()
findspark.find()

'/usr/local/lib/python3.10/dist-packages/pyspark'

In [None]:
import pandas as pd
import pyspark
import pyspark.sql.functions as f
from pyspark.sql import DataFrame, SparkSession, Window
from pyspark.sql.functions import spark_partition_id
from pyspark.sql.types import StructType, StructField, IntegerType, ShortType, StringType, DoubleType

In [None]:
def get_spark_session():
    ## Session 생성
    spark = (
        SparkSession
        .builder
        .appName("Commerce Session")
        .getOrCreate()
    )

    return spark

In [None]:
def extract_data(spark):

    struct_schema_order_detail = StructType([
        StructField("order_id", IntegerType()),
        StructField("product_id", IntegerType()),
        StructField("add_to_cart_order", ShortType()),
        StructField("reordered", ShortType())
    ])

    struct_schema_order = StructType([
        StructField("order_id", IntegerType()),
        StructField("user_id", IntegerType()),
        StructField("eval_set", StringType()),
        StructField("order_number", ShortType()),
        StructField("order_dow", ShortType()),
        StructField("order_hour_of_day", ShortType()),
        StructField("days_since_prior_order", DoubleType(), True)
        ])

    struct_schema_product = StructType([
        StructField("product_id", IntegerType()),
        StructField("product_name", StringType()),
        StructField("aisle_id", IntegerType()),
        StructField("department_id", IntegerType())
    ])
    struct_schema_aisle = StructType([
        StructField("aisle_id", IntegerType()),
        StructField("aisle", StringType())
    ])
    struct_schema_depart = StructType([
        StructField("department_id", IntegerType()),
        StructField("department", StringType())
    ])

    ## order detail
    df1 = spark.read.csv("/content/drive/MyDrive/order_products__prior.csv", header=True, schema=struct_schema_order_detail)
    df2 = spark.read.csv("/content/drive/MyDrive/order_products__train.csv", header=True, schema=struct_schema_order_detail)

    df = df1.union(df2)


    df_product = (
        spark.read.csv("/content/drive/MyDrive//products.csv", header=True, schema=struct_schema_product)
        .join(spark.read.csv("/content/drive/MyDrive/aisles.csv", header=True, schema=struct_schema_aisle), on="aisle_id", how="left")
        .join(spark.read.csv("/content/drive/MyDrive/departments.csv", header=True, schema=struct_schema_depart), on="department_id", how="left")
        .drop("aisle_id", "department_id")
    )

    df_order_detail = df.join(df_product, on="product_id", how="left")

    df_order = spark.read.csv("/content/drive/MyDrive/orders.csv", header=True, schema=struct_schema_order)
    df_order = df_order.filter("user_id % 13 = 1")

    return df_order_detail, df_order

In [None]:
spark = get_spark_session()
df_order_detail, df_order = extract_data(spark)

- 1. 재구매 주기(order)
- 2. 평일 / 주말 선호(order)
- 3. 총 구매 횟수(order)
- 4. 1회 평균 구매 제품 수 (df_order_detail)

In [None]:
df_order_max = df_order_detail.groupBy("order_id").agg(f.max("add_to_cart_order").alias("item_count"))

In [None]:
df_order = df_order.join(df_order_max, on="order_id", how="left")
df_order.persist()

DataFrame[order_id: int, user_id: int, eval_set: string, order_number: smallint, order_dow: smallint, order_hour_of_day: smallint, days_since_prior_order: double, item_count: smallint]

In [None]:
%%time
df_order.show()

+--------+-------+--------+------------+---------+-----------------+----------------------+----------+
|order_id|user_id|eval_set|order_number|order_dow|order_hour_of_day|days_since_prior_order|item_count|
+--------+-------+--------+------------+---------+-----------------+----------------------+----------+
|     148|  41523|   prior|          27|        2|               17|                   5.0|        14|
|     463|  92093|   prior|          18|        1|               17|                   5.0|         1|
|     496|   7580|   prior|           3|        5|               12|                   4.0|         3|
|    5803|  74530|   prior|          47|        1|               10|                   8.0|        27|
|    6654|  48998|   prior|          26|        1|               18|                  10.0|        17|
|    7554|  93770|   prior|          17|        2|               15|                   6.0|        10|
|   11033|  66886|   prior|           2|        6|               17|     

In [None]:
df_order = df_order.withColumn("order_weekend", f.col("order_dow") >= 5)

In [None]:
df_agg = df_order.groupBy("user_id").agg(
    f.mean("days_since_prior_order").alias("reorder_mean"),
    f.mean("item_count").alias("item_count"),
    f.count(f.lit(1)).alias("order_count"),
    f.mode("order_weekend").alias("weekend_prefer")
)

In [None]:
%%time
df_agg.persist()
df_agg.show()

+-------+------------------+------------------+-----------+--------------+
|user_id|      reorder_mean|        item_count|order_count|weekend_prefer|
+-------+------------------+------------------+-----------+--------------+
|   7554|            8.8125|17.764705882352942|         17|         false|
|  10817|10.464285714285714|13.689655172413794|         29|         false|
|  57201|              24.4|              10.0|          6|         false|
|    833|21.181818181818183|              19.5|         12|         false|
|  23271|15.545454545454545|16.818181818181817|         12|         false|
|  34061|             13.25|               4.0|          9|         false|
|  40574| 4.918918918918919| 7.743243243243243|         75|         false|
|  88674|          11.03125|              11.0|         33|         false|
| 114206| 6.777777777777778|              38.2|         10|         false|
| 119432| 7.105263157894737| 9.842105263157896|         20|         false|
| 171094|20.6666666666666

In [None]:
df_agg.describe().show()

+-------+-----------------+------------------+------------------+------------------+
|summary|          user_id|      reorder_mean|        item_count|       order_count|
+-------+-----------------+------------------+------------------+------------------+
|  count|            15863|             15863|             15863|             15863|
|   mean|         103104.0|15.479070041625368|10.119160092850482|16.536090272962237|
| stddev|59532.17393981176|6.9135648408786565| 5.935174437457448| 16.72254068797023|
|    min|                1|               0.4|               1.0|                 4|
|    max|           206207|              30.0|56.166666666666664|               100|
+-------+-----------------+------------------+------------------+------------------+



In [None]:
df_agg.groupBy('weekend_prefer').count().orderBy('count').show()

+--------------+-----+
|weekend_prefer|count|
+--------------+-----+
|          true| 1438|
|         false|14425|
+--------------+-----+



In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import udf


unlist = udf(lambda x: round(float(list(x)[0]),3), DoubleType())

for i in ["reorder_mean", "item_count", "order_count"]:
    # VectorAssembler Transformation - Converting column to vector type
    assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")

    # MinMaxScaler Transformation
    scaler = MinMaxScaler(inputCol=i+"_Vect", outputCol=i+"_Scaled")

    # Pipeline of VectorAssembler and MinMaxScaler
    pipeline = Pipeline(stages=[assembler, scaler])

    # Fitting pipeline on dataframe
    df_agg_ = pipeline.fit(df_agg).transform(df_agg).drop(i+"_Vect")
    df_agg = pipeline.fit(df_agg).transform(df_agg).withColumn(i+"_Scaled", unlist(i+"_Scaled")).drop(i+"_Vect")

In [None]:
df_agg.persist()

DataFrame[user_id: int, reorder_mean: double, item_count: double, order_count: bigint, weekend_prefer: boolean, reorder_mean_Scaled: double, item_count_Scaled: double, order_count_Scaled: double]

In [None]:
df_agg.show()

+-------+------------------+------------------+-----------+--------------+-------------------+-----------------+------------------+
|user_id|      reorder_mean|        item_count|order_count|weekend_prefer|reorder_mean_Scaled|item_count_Scaled|order_count_Scaled|
+-------+------------------+------------------+-----------+--------------+-------------------+-----------------+------------------+
|   7554|            8.8125|17.764705882352942|         17|         false|              0.284|            0.304|             0.135|
|  10817|10.464285714285714|13.689655172413794|         29|         false|               0.34|             0.23|              0.26|
|  57201|              24.4|              10.0|          6|         false|              0.811|            0.163|             0.021|
|    833|21.181818181818183|              19.5|         12|         false|              0.702|            0.335|             0.083|
|  23271|15.545454545454545|16.818181818181817|         12|         false|  

In [None]:
%%time
df_agg.describe().show()

+-------+-----------------+------------------+------------------+------------------+-------------------+------------------+-------------------+
|summary|          user_id|      reorder_mean|        item_count|       order_count|reorder_mean_Scaled| item_count_Scaled| order_count_Scaled|
+-------+-----------------+------------------+------------------+------------------+-------------------+------------------+-------------------+
|  count|            15863|             15863|             15863|             15863|              15863|             15863|              15863|
|   mean|         103104.0|15.479070041625368|10.119160092850482|16.536090272962237|  0.509429174809305|0.1653056798840067| 0.1305359641934061|
| stddev|59532.17393981176|6.9135648408786565| 5.935174437457448| 16.72254068797023|0.23357178144738938| 0.107586148711947|0.17422188616239306|
|    min|                1|               0.4|               1.0|                 4|                0.0|               0.0|             

In [None]:
%%time
vecAssembler = VectorAssembler(inputCols=["weekend_prefer", "reorder_mean_Scaled", "item_count_Scaled", "order_count_Scaled"], outputCol="features")
new_df = vecAssembler.transform(df_agg)
new_df.persist()
new_df.show()

+-------+------------------+------------------+-----------+--------------+-------------------+-----------------+------------------+--------------------+
|user_id|      reorder_mean|        item_count|order_count|weekend_prefer|reorder_mean_Scaled|item_count_Scaled|order_count_Scaled|            features|
+-------+------------------+------------------+-----------+--------------+-------------------+-----------------+------------------+--------------------+
|   7554|            8.8125|17.764705882352942|         17|         false|              0.284|            0.304|             0.135|[0.0,0.284,0.304,...|
|  10817|10.464285714285714|13.689655172413794|         29|         false|               0.34|             0.23|              0.26|[0.0,0.34,0.23,0.26]|
|  57201|              24.4|              10.0|          6|         false|              0.811|            0.163|             0.021|[0.0,0.811,0.163,...|
|    833|21.181818181818183|              19.5|         12|         false|        

In [None]:
%%time
from pyspark.ml.clustering import KMeans

kmeans = KMeans(k=5, seed=1)
model = kmeans.fit(new_df.select('features'))

CPU times: user 1.17 s, sys: 307 ms, total: 1.47 s
Wall time: 1min 54s


In [None]:
%%time
transformed = model.transform(new_df)
df_cluster = transformed.toPandas()

CPU times: user 576 ms, sys: 22.7 ms, total: 599 ms
Wall time: 4.62 s


In [None]:
df_cluster.groupby("prediction")["weekend_prefer", "reorder_mean_Scaled", "item_count_Scaled", "order_count_Scaled"].mean()

  df_cluster.groupby("prediction")["weekend_prefer", "reorder_mean_Scaled", "item_count_Scaled", "order_count_Scaled"].mean()


Unnamed: 0_level_0,weekend_prefer,reorder_mean_Scaled,item_count_Scaled,order_count_Scaled
prediction,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
0,0.0,0.829121,0.16062,0.034342
1,0.0,0.19657,0.166578,0.595069
2,1.0,0.546132,0.17334,0.094446
3,0.0,0.310304,0.164462,0.159362
4,0.0,0.553168,0.166628,0.077244
