In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count,sum, when
spark = SparkSession.builder.appName("Generate Features").getOrCreate()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
orders = spark.read.csv("jupyter_workspace/archive/orders/orders.csv",header=  True )
print("Schema of the CSV file:")
orders.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Schema of the CSV file:
root
 |-- order_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- eval_set: string (nullable = true)
 |-- order_number: string (nullable = true)
 |-- order_dow: string (nullable = true)
 |-- order_hour_of_day: string (nullable = true)
 |-- days_since_prior_order: string (nullable = true)

In [9]:
order_products = spark.read.csv("jupyter_workspace/archive/order_products/",header=  True )
print("Schema of the CSV file:")
order_products.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Schema of the CSV file:
root
 |-- order_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- add_to_cart_order: string (nullable = true)
 |-- reordered: string (nullable = true)

In [10]:
orders.createOrReplaceTempView("orders")
order_products.createOrReplaceTempView("order_products")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
order_products_prior = spark.sql("SELECT a.*, \
            b.product_id, \
            b.add_to_cart_order, \
            b.reordered \
            FROM orders a \
            JOIN order_products b \
            ON a.order_id = b.order_id \
            WHERE a.eval_set = 'prior' ") 

order_products_prior.coalesce(5).write.mode("overwrite").parquet("jupyter_workspace/archive/output/")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
orders.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+-------+--------+------------+---------+-----------------+----------------------+
|order_id|user_id|eval_set|order_number|order_dow|order_hour_of_day|days_since_prior_order|
+--------+-------+--------+------------+---------+-----------------+----------------------+
| 2539329|      1|   prior|           1|        2|               08|                  null|
| 2398795|      1|   prior|           2|        3|               07|                  15.0|
|  473747|      1|   prior|           3|        3|               12|                  21.0|
| 2254736|      1|   prior|           4|        4|               07|                  29.0|
|  431534|      1|   prior|           5|        4|               15|                  28.0|
+--------+-------+--------+------------+---------+-----------------+----------------------+
only showing top 5 rows

In [18]:
user_features_1 = spark.sql("""
    SELECT 
        a.user_id, 
        MAX(a.order_number) AS max_order_number, 
        SUM(a.days_since_prior_order) AS total_days_since_prior_order, 
        ROUND (AVG(a.days_since_prior_order), 2) AS avg_days_since_prior_order
    FROM orders a
    GROUP BY a.user_id
""")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
user_features_1.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+----------------+----------------------------+--------------------------+
|user_id|max_order_number|total_days_since_prior_order|avg_days_since_prior_order|
+-------+----------------+----------------------------+--------------------------+
| 100005|               9|                       270.0|                      15.0|
| 100014|               4|                        66.0|                      22.0|
| 100025|               4|                        90.0|                      30.0|
| 100040|               9|                       313.0|                     24.08|
| 100072|               9|                       259.0|                      8.93|
+-------+----------------+----------------------------+--------------------------+
only showing top 5 rows

In [22]:
order_products_prior = spark.read.parquet("jupyter_workspace/archive/output/" )
order_products_prior.createOrReplaceTempView("order_products_prior")
order_products_prior.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- order_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- eval_set: string (nullable = true)
 |-- order_number: string (nullable = true)
 |-- order_dow: string (nullable = true)
 |-- order_hour_of_day: string (nullable = true)
 |-- days_since_prior_order: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- add_to_cart_order: string (nullable = true)
 |-- reordered: string (nullable = true)

In [28]:
user_features_2 = spark.sql("""
    SELECT 
        opp.user_id, 
        COUNT(opp.product_id) AS total_number_products, 
        COUNT(DISTINCT opp.product_id) AS total_number_distinct_products,
        ROUND(SUM(opp.reordered) / COUNT(opp.order_id),2) AS reorder_ratio
    FROM order_products_prior opp
    GROUP BY opp.user_id
""")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
user_features_2.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+---------------------+------------------------------+-------------+
|user_id|total_number_products|total_number_distinct_products|reorder_ratio|
+-------+---------------------+------------------------------+-------------+
| 193548|                  242|                            89|         0.63|
|  42370|                   40|                            35|         0.13|
| 203842|                  292|                           111|         0.62|
| 143593|                  159|                            60|         0.62|
|  71627|                  139|                            84|          0.4|
+-------+---------------------+------------------------------+-------------+
only showing top 5 rows

In [31]:
up_features = spark.sql("""
    SELECT
        opp.user_id,
        opp.product_id,
        COUNT(opp.user_id) AS total_number_orders, 
        MIN(opp.order_number) AS min_order_number,
        MAX(opp.order_number) AS max_order_number,
        AVG(add_to_cart_order) AS average_add_cart_order
    FROM order_products_prior opp
    GROUP BY opp.user_id, opp.product_id
""")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [32]:
up_features.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+----------+-------------------+----------------+----------------+----------------------+
|user_id|product_id|total_number_orders|min_order_number|max_order_number|average_add_cart_order|
+-------+----------+-------------------+----------------+----------------+----------------------+
|     10|     13212|                  1|               2|               2|                  39.0|
|     10|     23541|                  2|               4|               5|                  18.5|
|     10|     44359|                  1|               3|               3|                   6.0|
|    100|      6975|                  1|               3|               3|                   4.0|
|   1000|     29987|                  2|               2|               3|                  13.5|
+-------+----------+-------------------+----------------+----------------+----------------------+
only showing top 5 rows

In [48]:
prd_features = spark.sql("""
    WITH product_sequences AS (
        SELECT
            opp.user_id,
            opp.order_id,
            opp.product_id,
            opp.reordered,
            ROW_NUMBER() OVER (PARTITION BY opp.user_id, opp.product_id ORDER BY opp.order_id) AS product_seq_time
        FROM order_products_prior opp
    )
    SELECT 
        product_id,
        COUNT(reordered),
        SUM(reordered),
        product_seq_time
    FROM product_sequences
    WHERE product_seq_time IN (1, 2)     
    GROUP BY product_id, product_seq_time
    ORDER BY product_id

""")
prd_features.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+----------------+--------------+----------------+
|product_id|count(reordered)|sum(reordered)|product_seq_time|
+----------+----------------+--------------+----------------+
|         1|             276|         185.0|               2|
|         1|             716|         198.0|               1|
|        10|             399|         277.0|               2|
|        10|            1268|         245.0|               1|
|       100|             192|          48.0|               1|
|       100|              80|          56.0|               2|
|      1000|             421|         262.0|               2|
|      1000|            1545|         271.0|               1|
|     10000|               7|           1.0|               1|
|     10000|               2|           1.0|               2|
+----------+----------------+--------------+----------------+
only showing top 10 rows

In [50]:
test = spark.sql("""
        SELECT
            opp.user_id,
            opp.order_id,
            opp.product_id,
            opp.reordered,
            ROW_NUMBER() OVER (PARTITION BY opp.user_id, opp.product_id ORDER BY opp.order_id) AS product_seq_time
        FROM order_products_prior opp 
        ORDER BY product_id
        """)
test.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------+----------+---------+----------------+
|user_id|order_id|product_id|reordered|product_seq_time|
+-------+--------+----------+---------+----------------+
| 119163| 2517311|         1|        0|               1|
| 181697| 1817414|         1|        1|               7|
| 159791|  491203|         1|        0|               1|
| 112110| 1673704|         1|        0|               1|
| 163386| 1870503|         1|        1|               1|
| 181697| 1568474|         1|        1|               2|
| 181697|  159230|         1|        1|               3|
| 105870|  679521|         1|        0|               1|
| 181697| 1620454|         1|        1|               4|
| 163386| 2542360|         1|        0|               2|
+-------+--------+----------+---------+----------------+
only showing top 10 rows