In [18]:
pip install pyspark



In [19]:
import pyspark
import pandas as pd

In [20]:
from pyspark.sql import SparkSession

In [21]:
spark = SparkSession.builder.getOrCreate()

In [22]:
customerDF = spark.read.csv("customer_data.csv",header = True,inferSchema = True)
customerDF.show()

+-----------+------------------+---+----------------+
|customer_id|              name|age|purchase_history|
+-----------+------------------+---+----------------+
|          1|          John Doe| 32|               1|
|          2|        Jane Smith| 45|               3|
|          3|   Michael Johnson| 28|               2|
|          4|       Emily Davis| 50|               1|
|          5|     Robert Wilson| 35|               3|
|          6|       Emma Taylor| 22|               2|
|          7|    David Anderson| 40|               1|
|          8|       Sarah Brown| 29|               3|
|          9|  Matthew Thompson| 37|               2|
|         10|   Olivia Martinez| 33|               1|
|         11|    William Garcia| 26|               3|
|         12|   Sophia Robinson| 48|               2|
|         13|         James Lee| 31|               1|
|         14|Isabella Rodriguez| 39|               3|
|         15|  Joseph Hernandez| 42|               2|
|         16|      Emily Wri

In [23]:
productDF = spark.read.csv("product_data.csv",header = True)
productDF.show()

+----------+----------+-----------+-----+
|product_id|      name|   category|price|
+----------+----------+-----------+-----+
|         1|    Laptop|Electronics| 1200|
|         2|     Shirt|   Clothing|   30|
|         3|      Book|      Books|   15|
|         4|Headphones|Electronics|  100|
|         5|     Dress|   Clothing|   50|
|         6|Smartphone|Electronics|  800|
|         7|     Jeans|   Clothing|   40|
|         8|    Tablet|Electronics|  500|
|         9|     Watch|   Clothing|  100|
|        10|   Speaker|Electronics|   80|
|        11|     Skirt|   Clothing|   35|
|        12|     Novel|      Books|   20|
|        13|        TV|Electronics| 1500|
|        14|  Sneakers|   Clothing|   60|
|        15|   Headset|Electronics|   70|
|        16|    Shorts|   Clothing|   25|
|        17|     Mouse|Electronics|   20|
|        18|       Hat|   Clothing|   15|
|        19|  Keyboard|Electronics|   40|
|        20|     Pants|   Clothing|   45|
+----------+----------+-----------

In [24]:
#customerDF.filter((productDF["category"] == 'Electronics') & (customerDF["purchase_history"] == productDF["product_id"])).show()

df = customerDF.join(productDF, customerDF["purchase_history"] == productDF["product_id"]).filter(productDF["category"] == 'Electronics')
df.show()

+-----------+---------------+---+----------------+----------+------+-----------+-----+
|customer_id|           name|age|purchase_history|product_id|  name|   category|price|
+-----------+---------------+---+----------------+----------+------+-----------+-----+
|          1|       John Doe| 32|               1|         1|Laptop|Electronics| 1200|
|          4|    Emily Davis| 50|               1|         1|Laptop|Electronics| 1200|
|          7| David Anderson| 40|               1|         1|Laptop|Electronics| 1200|
|         10|Olivia Martinez| 33|               1|         1|Laptop|Electronics| 1200|
|         13|      James Lee| 31|               1|         1|Laptop|Electronics| 1200|
|         16|   Emily Wright| 27|               1|         1|Laptop|Electronics| 1200|
|         19|  Oliver Thomas| 43|               1|         1|Laptop|Electronics| 1200|
+-----------+---------------+---+----------------+----------+------+-----------+-----+



In [40]:
from pyspark.ml.feature import StringIndexer

In [43]:

indexer = StringIndexer(inputCols=["category"] , outputCols=["categoryIndex"])
df_r = indexer.fit(productDF).transform(productDF)
df_r.show()


+----------+----------+-----------+-----+-------------+
|product_id|      name|   category|price|categoryIndex|
+----------+----------+-----------+-----+-------------+
|         1|    Laptop|Electronics| 1200|          1.0|
|         2|     Shirt|   Clothing|   30|          0.0|
|         3|      Book|      Books|   15|          2.0|
|         4|Headphones|Electronics|  100|          1.0|
|         5|     Dress|   Clothing|   50|          0.0|
|         6|Smartphone|Electronics|  800|          1.0|
|         7|     Jeans|   Clothing|   40|          0.0|
|         8|    Tablet|Electronics|  500|          1.0|
|         9|     Watch|   Clothing|  100|          0.0|
|        10|   Speaker|Electronics|   80|          1.0|
|        11|     Skirt|   Clothing|   35|          0.0|
|        12|     Novel|      Books|   20|          2.0|
|        13|        TV|Electronics| 1500|          1.0|
|        14|  Sneakers|   Clothing|   60|          0.0|
|        15|   Headset|Electronics|   70|       

In [46]:
customerProductDF = customerDF.join(productDF , customerDF['purchase_history'] == productDF['product_id'])
customerProductDF.show()

+-----------+------------------+---+----------------+----------+------+-----------+-----+
|customer_id|              name|age|purchase_history|product_id|  name|   category|price|
+-----------+------------------+---+----------------+----------+------+-----------+-----+
|          1|          John Doe| 32|               1|         1|Laptop|Electronics| 1200|
|          2|        Jane Smith| 45|               3|         3|  Book|      Books|   15|
|          3|   Michael Johnson| 28|               2|         2| Shirt|   Clothing|   30|
|          4|       Emily Davis| 50|               1|         1|Laptop|Electronics| 1200|
|          5|     Robert Wilson| 35|               3|         3|  Book|      Books|   15|
|          6|       Emma Taylor| 22|               2|         2| Shirt|   Clothing|   30|
|          7|    David Anderson| 40|               1|         1|Laptop|Electronics| 1200|
|          8|       Sarah Brown| 29|               3|         3|  Book|      Books|   15|
|         

In [53]:
from pyspark.sql.functions import when

condition = customerProductDF['age'] < 30
original_price = customerProductDF['price']

customerProductDF = customerProductDF.withColumn('discounted_price',when(condition,original_price).otherwise(original_price * 0.9))
customerProductDF.show()

+-----------+------------------+---+----------------+----------+------+-----------+-----+----------------+
|customer_id|              name|age|purchase_history|product_id|  name|   category|price|discounted_price|
+-----------+------------------+---+----------------+----------+------+-----------+-----+----------------+
|          1|          John Doe| 32|               1|         1|Laptop|Electronics| 1200|          1080.0|
|          2|        Jane Smith| 45|               3|         3|  Book|      Books|   15|            13.5|
|          3|   Michael Johnson| 28|               2|         2| Shirt|   Clothing|   30|              30|
|          4|       Emily Davis| 50|               1|         1|Laptop|Electronics| 1200|          1080.0|
|          5|     Robert Wilson| 35|               3|         3|  Book|      Books|   15|            13.5|
|          6|       Emma Taylor| 22|               2|         2| Shirt|   Clothing|   30|              30|
|          7|    David Anderson| 40| 