In [2]:
from pyspark.sql import SparkSession, Window
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, log, row_number

spark = SparkSession.builder \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.cores", "4") \
    .appName("Product Recommendation") \
    .getOrCreate()

df = spark.read.parquet("/home/m1nhd3n/Works/DataEngineer/product_recommendations/data/preprocess/transformed_data.parquet")
df.printSchema()

25/02/20 15:20:26 WARN Utils: Your hostname, m1nhd3n resolves to a loopback address: 127.0.1.1; using 192.168.1.158 instead (on interface wlp0s20f3)
25/02/20 15:20:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/20 15:20:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


root
 |-- ncodpers: integer (nullable = true)
 |-- month_idx: integer (nullable = true)
 |-- product_vec: vector (nullable = true)
 |-- feat_vec: vector (nullable = true)



In [3]:
from pyspark.sql import functions as F
from pyspark.sql import Window

windowSpec = Window.partitionBy("ncodpers").orderBy("month_idx")
df = df.withColumn("prev_product_vec", F.lag("product_vec").over(windowSpec))

In [4]:
from pyspark.ml.linalg import DenseVector, VectorUDT, Vectors
from pyspark.sql.functions import col, udf
import numpy as np


@udf(returnType=VectorUDT())
def subtract_vector_udf(vec1, vec2):
    if vec2 is None:
        return None
    vec_diff = vec1.toArray() - vec2.toArray()
    new_prod_vec = np.clip(vec_diff, 0, 1)
    indices = np.nonzero(new_prod_vec)[0]
    if len(indices) == 0:
        return None
    values = new_prod_vec[indices]

    return Vectors.sparse(len(new_prod_vec), indices, values)

df = df.withColumn("new_product_vec", subtract_vector_udf(col("product_vec"), col("prev_product_vec")))

In [5]:
df = df.withColumn("target_product_vec", F.lag("new_product_vec", -1).over(windowSpec))

In [6]:
actual_buy_df = df.select("*").where(df.target_product_vec.isNotNull())

In [7]:
actual_buying_cust = actual_buy_df.select("ncodpers").distinct()

In [8]:
ab_custs = actual_buying_cust.collect()

                                                                                

In [9]:
cust_ids = [c.ncodpers for c in ab_custs]
len(cust_ids)

187219

In [10]:
actual_buy_df = df.filter(df.ncodpers.isin(cust_ids))

In [11]:
actual_buy_df.show()

25/02/20 15:23:40 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB
25/02/20 15:23:48 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
25/02/20 15:24:24 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
                                                                                

+--------+---------+--------------------+--------------------+--------------------+---------------+------------------+
|ncodpers|month_idx|         product_vec|            feat_vec|    prev_product_vec|new_product_vec|target_product_vec|
+--------+---------+--------------------+--------------------+--------------------+---------------+------------------+
|   15923|        1|(24,[4,12,21,22,2...|(17,[4,5,10,12,14...|                NULL|           NULL|              NULL|
|   15923|        2|(24,[4,12,21,22,2...|(17,[4,5,10,12,14...|(24,[4,12,21,22,2...|           NULL|              NULL|
|   15923|        3|(24,[4,12,21,22,2...|(17,[4,5,10,12,14...|(24,[4,12,21,22,2...|           NULL|   (24,[18],[1.0])|
|   15923|        4|(24,[4,12,18,21,2...|(17,[4,5,10,12,14...|(24,[4,12,21,22,2...|(24,[18],[1.0])|              NULL|
|   15923|        5|(24,[4,12,18,21,2...|(17,[4,5,10,12,14...|(24,[4,12,18,21,2...|           NULL|              NULL|
|   15923|        6|(24,[4,12,18,21,2...|(17,[4,

In [13]:
actual_buy_df.write.parquet("/home/m1nhd3n/Works/DataEngineer/product_recommendations/data/preprocess/created_target_col")

25/02/20 15:25:28 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB
25/02/20 15:25:34 WARN DAGScheduler: Broadcasting large task binary with size 5.5 MiB
25/02/20 15:26:11 WARN DAGScheduler: Broadcasting large task binary with size 5.7 MiB
                                                                                