In [10]:
%idle_timeout 2880
%glue_version 5.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import functions as F
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql import SparkSession

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

You are already connected to a glueetl session 8c58be73-4235-4d50-acff-b48a756154e1.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Current idle_timeout is 2880 minutes.
idle_timeout has been set to 2880 minutes.


You are already connected to a glueetl session 8c58be73-4235-4d50-acff-b48a756154e1.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Setting Glue version to: 5.0


You are already connected to a glueetl session 8c58be73-4235-4d50-acff-b48a756154e1.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous worker type: G.1X
Setting new worker type to: G.1X


You are already connected to a glueetl session 8c58be73-4235-4d50-acff-b48a756154e1.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous number of workers: 5
Setting new number of workers to: 5



In [12]:
glueContext = GlueContext(SparkContext.getOrCreate())

# Access the existing SparkSession
spark = glueContext.spark_session

# Define the S3 path where your data is stored
s3_path = "s3://my-data-source-bucket/ulta_perfume_data.csv"

# Load data from S3 as a Spark DataFrame
df = spark.read.csv(s3_path, header=True, inferSchema=True)

# Show the schema of the DataFrame
df.printSchema()

root
 |-- Product Name: string (nullable = true)
 |-- Brand: string (nullable = true)
 |-- Reviews: string (nullable = true)


In [15]:
from pyspark.sql.functions import regexp_extract, col

# Use regexp_extract to extract the rating and number of reviews
df = df.withColumn("Rating", regexp_extract(col("Reviews"), r'(\d+\.\d) out of 5 stars', 1)) \
       .withColumn("Number of Reviews", regexp_extract(col("Reviews"), r'; (\d+) reviews', 1).cast("int"))

# Drop the original 'Reviews' column
df = df.drop("Reviews")

# Show the DataFrame
df.show(truncate=False)

+---------------------------------------+-----------------------+------+-----------------+
|Product Name                           |Brand                  |Rating|Number of Reviews|
+---------------------------------------+-----------------------+------+-----------------+
|Eilish Eau de Parfum                   |Billie Eilish          |4.5   |3431             |
|COCO MADEMOISELLE Eau de Parfum Spray  |CHANEL                 |4.5   |362              |
|Donna Born In Roma Eau de Parfum       |Valentino              |4.8   |6034             |
|Cloud Eau de Parfum                    |Ariana Grande          |4.3   |2772             |
|Miss Dior Eau de Parfum                |Dior                   |4.6   |9704             |
|Born in Roma Donna Perfume Gift Set    |Valentino              |4.8   |6034             |
|Her Eau de Parfum                      |Burberry               |4.5   |5046             |
|CHANCE EAU TENDRE Eau de Parfum Spray  |CHANEL                 |4.4   |268              |

In [16]:
# Filter out rows where Rating is NULL or empty
df_clean = df.filter(df['Rating'].isNotNull() & (df['Rating'] != ''))

# Sort by Rating and number of Reviews in descending/ascending order and show the top 10
top_10_by_rating = df_clean.orderBy(F.col("Rating"), ascending=False).limit(10)
bottom_10_by_rating = df_clean.orderBy(F.col("Rating"), ascending=True).limit(10)
top_10_by_reviews = df_clean.orderBy(F.col("Number of Reviews"), ascending=False).limit(10)
bottom_10_by_reviews = df_clean.orderBy(F.col("Number of Reviews"), ascending=True).limit(10)

# Show the results
print("Top 10 Perfumes Based on Rating (Ignoring Empty Ratings):")
top_10_by_rating.show(truncate=False)

print("Bottom 10 Perfumes Based on Rating (Ignoring Empty Ratings):")
bottom_10_by_rating.show(truncate=False)

print("Top 10 Perfumes Based on Number of Reviews (Ignoring Empty Ratings):")
top_10_by_reviews.show(truncate=False)

print("Bottom 10 Perfumes Based on Number of Reviews (Ignoring Empty Ratings):")
bottom_10_by_reviews.show(truncate=False)


Top 10 Perfumes Based on Rating (Ignoring Empty Ratings):
+-------------------------------------------+------------------+------+-----------------+
|Product Name                               |Brand             |Rating|Number of Reviews|
+-------------------------------------------+------------------+------+-----------------+
|Wild Poppy Eau de Parfum                   |NEST New York     |4.9   |68               |
|J'adore Eau Lumière Eau de Toilette        |Dior              |4.8   |100              |
|Pleasures Eau de Parfum Perfume Spray      |Estée Lauder      |4.8   |699              |
|Mini Flowerbomb Perfume Set                |Viktor&Rolf       |4.8   |128              |
|Donna Born In Roma Eau de Parfum           |Valentino         |4.8   |6034             |
|Libre Flower & Flames Eau de Parfum Florale|Yves Saint Laurent|4.8   |1762             |
|Libre Eau de Parfum Intense                |Yves Saint Laurent|4.8   |1018             |
|Bright Crystal Absolu Eau de Parfum      

In [17]:
# Calculate the weighted rating (rating * number of reviews) for each row
df_with_weights = df.withColumn(
    "Weighted Rating", 
    F.col("Rating") * F.col("Number of Reviews")
)

# Group by Brand and calculate the total number of reviews and the weighted sum of ratings
brand_weighted_avg_df = df_with_weights.groupBy("Brand").agg(
    F.sum("Weighted Rating").alias("Total Weighted Rating"),  # Total weighted rating for each brand
    F.sum("Number of Reviews").alias("Total Number of Reviews")  # Total reviews for each brand
)

# Calculate the weighted average rating
brand_weighted_avg_df = brand_weighted_avg_df.withColumn(
    "Weighted Average Rating", 
    F.col("Total Weighted Rating") / F.col("Total Number of Reviews")
)

# Sort results
top_10_df = brand_weighted_avg_df.orderBy(F.col("Weighted Average Rating").desc()).limit(10)
bottom_10_df = brand_weighted_avg_df.orderBy(F.col("Weighted Average Rating").asc()).limit(10)

print("Top 10 Brands by Weighted Average Rating:")
top_10_df.show(truncate=False)

print("Bottom 10 Brands by Weighted Average Rating:")
bottom_10_df.show(truncate=False)

Top 10 Brands by Weighted Average Rating:
+-----------------+---------------------+-----------------------+-----------------------+
|Brand            |Total Weighted Rating|Total Number of Reviews|Weighted Average Rating|
+-----------------+---------------------+-----------------------+-----------------------+
|florence by mills|796.8                |166                    |4.8                    |
|Guerlain         |662.4                |138                    |4.8                    |
|Valentino        |82507.40000000001    |17298                  |4.769765290785062      |
|Snif             |29643.899999999994   |6217                   |4.768200096509569      |
|Sol de Janeiro   |23998.2              |5106                   |4.7                    |
|RMS Beauty       |578.1                |123                    |4.7                    |
|Prada            |80829.4              |17220                  |4.693925667828107      |
|Lancôme          |103433.00000000001   |22111            

In [18]:
# Convert Spark DataFrame to Glue DynamicFrame
dynamic_frame = DynamicFrame.fromDF(df_with_weights, glueContext, "dynamic_frame")

# Saving the Glue DynamicFrame to S3
output_s3_path = "s3://my-data-source-bucket/your-output-folder/weightedratings.csv"

glueContext.write_dynamic_frame.from_options(
    dynamic_frame,  # Your DynamicFrame
    connection_type = "s3",
    connection_options = {
        "path": output_s3_path
    },
    format = "csv"  # Writing the data in CSV format
)

print(f"Data written to S3 path: {output_s3_path}")


Data written to S3 path: s3://my-data-source-bucket/your-output-folder/weightedratings.csv
