In [None]:
from pyspark.sql import SparkSession

# Set your own jar path
jar_path = "path/to/postgresql-42.x.x.jar"

# SparkSession strat up
spark = SparkSession.builder \
    .appName("TestSession") \
    .master("local[*]") \
    .config("spark.jars", jar_path) \
    .getOrCreate()

print(spark.version)



3.5.2


In [None]:
# Insert your data here
jdbc_url = "jdbc:postgresql://<host>:<port>/<database>"
db_properties = {
    "user": "<username>",
    "password": "<password>",
    "driver": "org.postgresql.Driver"
}


# Read data from database
df = spark.read \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "shopping_trends") \
    .option("user", db_properties["user"]) \
    .option("password", db_properties["password"]) \
    .option("driver", db_properties["driver"]) \
    .load()

df.show()

+---+---+----------+-----------+------+-------------+----+---------+------+------+
| id|age|      item|   category|amount|     location|size|    color|season|rating|
+---+---+----------+-----------+------+-------------+----+---------+------+------+
|  1| 55|    Blouse|   Clothing|    53|     Kentucky|   L|     Gray|Winter|   3.1|
|  2| 19|   Sweater|   Clothing|    64|        Maine|   L|   Maroon|Winter|   3.1|
|  3| 50|     Jeans|   Clothing|    73|Massachusetts|   S|   Maroon|Spring|   3.1|
|  4| 21|   Sandals|   Footwear|    90| Rhode Island|   M|   Maroon|Spring|   3.5|
|  5| 45|    Blouse|   Clothing|    49|       Oregon|   M|Turquoise|Spring|   2.7|
|  6| 46|  Sneakers|   Footwear|    20|      Wyoming|   M|    White|Summer|   2.9|
|  7| 63|     Shirt|   Clothing|    85|      Montana|   M|     Gray|  Fall|   3.2|
|  8| 27|    Shorts|   Clothing|    34|    Louisiana|   L| Charcoal|Winter|   3.2|
|  9| 26|      Coat|  Outerwear|    97|West Virginia|   L|   Silver|Summer|   2.6|
| 10

In [3]:
from pyspark.sql.functions import col

# Selecting columns for usage in the model
rating = df.select(col("id"), 
                    col("item"),  
                    col("rating"))

In [None]:
from pyspark.ml.feature import StringIndexer, IndexToString

# Initialize the StringIndexer
indexer = StringIndexer(inputCol="item", outputCol="itemId")

# Apply the transformation to the DataFrame
df_indexed = indexer.fit(rating).transform(rating)


df_indexed.select("item", "itemId").show(5)

+-------+------+
|   item|itemId|
+-------+------+
| Blouse|   0.0|
|Sweater|   5.0|
|  Jeans|  24.0|
|Sandals|  10.0|
| Blouse|   0.0|
+-------+------+
only showing top 5 rows



In [9]:
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import explode, col
 
 
# Build the recommendation model using ALS
als = ALS(userCol="id", itemCol="itemId", ratingCol="rating")
model = als.fit(df_indexed)
 
# Generate the best 4 recommendations for each user
user_recommendations = model.recommendForAllUsers(4)
 
user_recommendations.show(100)

+---+--------------------+
| id|     recommendations|
+---+--------------------+
|  1|[{0, 2.973279}, {...|
|  3|[{24, 3.0111287},...|
|  6|[{20, 2.8043518},...|
| 12|[{14, 4.737403}, ...|
| 13|[{8, 4.3657036}, ...|
| 16|[{12, 2.7051325},...|
| 20|[{2, 3.1749861}, ...|
| 22|[{2, 3.94468}, {1...|
| 26|[{17, 3.4608638},...|
| 27|[{1, 3.4888885}, ...|
| 28|[{14, 4.8340836},...|
| 31|[{1, 4.5549374}, ...|
| 34|[{2, 3.1749861}, ...|
| 40|[{2, 4.040891}, {...|
| 44|[{13, 3.5970604},...|
| 47|[{3, 2.7944472}, ...|
| 48|[{1, 4.3611107}, ...|
| 52|[{12, 4.5407577},...|
| 53|[{19, 2.5040898},...|
| 54|[{12, 3.7678633},...|
| 57|[{22, 4.532401}, ...|
| 65|[{21, 4.675359}, ...|
| 76|[{13, 2.6248817},...|
| 78|[{4, 3.1937559}, ...|
| 81|[{10, 3.1924167},...|
| 85|[{8, 4.2686887}, ...|
| 86|[{9, 4.3229146}, ...|
| 91|[{15, 2.819289}, ...|
| 93|[{6, 4.272446}, {...|
| 94|[{5, 4.3422003}, ...|
| 96|[{20, 3.6746676},...|
|101|[{9, 2.5937486}, ...|
|103|[{11, 4.6368914},...|
|108|[{20, 2.997755}, ...|
|

In [10]:
# Explode the recommendations column to separate each (itemId, recommendations) into its own row
user_recommendations_exploded = user_recommendations.withColumn("recommendation", explode(col("recommendations")))

# Extract the itemId and rating from the exploded column
user_recommendations_exploded = user_recommendations_exploded.select(
    "id", 
    col("recommendation.itemId").alias("itemId"),
    col("recommendation.rating").alias("rating")
)

# Initialize the IndexToString to map itemId back to the original item names
indexer_reverse = IndexToString(inputCol="itemId", outputCol="item", labels=indexer.fit(rating).labels)

# Apply reverse transformation to convert itemId back to item names
user_recommendations_with_names = indexer_reverse.transform(user_recommendations_exploded)


user_recommendations_with_names.show(100, truncate=False)

+---+------+---------+----------+
|id |itemId|rating   |item      |
+---+------+---------+----------+
|1  |0     |2.973279 |Blouse    |
|1  |16    |1.9589967|Handbag   |
|1  |12    |1.9197434|Skirt     |
|1  |3     |1.6445383|Shirt     |
|3  |24    |3.0111287|Jeans     |
|3  |3     |1.3440131|Shirt     |
|3  |17    |1.3370737|Hoodie    |
|3  |8     |1.273194 |Coat      |
|6  |20    |2.8043518|Sneakers  |
|6  |23    |1.3171581|Gloves    |
|6  |24    |1.312065 |Jeans     |
|6  |9     |1.1314882|Sunglasses|
|12 |14    |4.737403 |Shorts    |
|12 |5     |2.7668684|Sweater   |
|12 |2     |2.477179 |Pants     |
|12 |7     |2.304946 |Belt      |
|13 |8     |4.3657036|Coat      |
|13 |10    |2.5875366|Sandals   |
|13 |3     |2.1889906|Shirt     |
|13 |19    |1.9658369|T-shirt   |
|16 |12    |2.7051325|Skirt     |
|16 |1     |1.9955331|Jewelry   |
|16 |18    |1.6169804|Shoes     |
|16 |0     |1.4371858|Blouse    |
|20 |2     |3.1749861|Pants     |
|20 |14    |1.9045899|Shorts    |
|20 |12    |1.

In [16]:
spark.stop()