In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,expr,substring,regexp_replace,desc
from pyspark.sql.functions import max
from pyspark.sql.types import LongType

In [2]:
spark = SparkSession.builder \
    .appName("Phone_deals") \
    .config("spark.mongodb.input.uri","mongodb://127.0.0.1/phone.details") \
    .config("spark.mongodb.output.uri","mongodb://127.0.0.1/phone.details") \
    .config('spark.jars.packages','org.mongodb.spark:mongo-spark-connector_2.12:2.4.2')\
    .getOrCreate()

:: loading settings :: url = jar:file:/home/kafka/.local/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/kafka/.ivy2/cache
The jars for the packages stored in: /home/kafka/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-6286a333-d24a-45b3-bf80-ce381be621d8;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;2.4.2 in central
	found org.mongodb#mongo-java-driver;3.12.5 in central
:: resolution report :: resolve 631ms :: artifacts dl 10ms
	:: modules in use:
	org.mongodb#mongo-java-driver;3.12.5 from central in [default]
	org.mongodb.spark#mongo-spark-connector_2.12;2.4.2 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   

In [3]:
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
df.printSchema()


                                                                                

root
 |-- No_of_reviews: string (nullable = true)
 |-- Phone_name: string (nullable = true)
 |-- Phone_price: string (nullable = true)
 |-- Rating: string (nullable = true)
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)



In [4]:
df = df[df.columns[:-1]]
print(df.show())

+-------------+--------------------+-----------+------+
|No_of_reviews|          Phone_name|Phone_price|Rating|
+-------------+--------------------+-----------+------+
|     (37,008)|realme C55 (Rainf...|    ₹10,999|   4.4|
|      (9,245)|realme GT 2 (Stee...|    ₹39,999|   4.3|
|     (21,867)|POCO C55 (Cool Bl...|     ₹7,749|   4.2|
|      (7,266)|POCO M5 (Icy Blue...|     ₹8,999|   4.2|
|      (7,726)|POCO X5 Pro 5G (H...|    ₹21,999|   4.3|
|     (14,255)|POCO C51 (Royal B...|     ₹6,999|   4.3|
|     (76,446)|POCO M4 5G (Yello...|    ₹10,999|   4.2|
|      (2,472)|POCO F5 5G (Snows...|    ₹29,999|   4.2|
|     (24,136)|SAMSUNG Galaxy F0...|     ₹7,299|   4.2|
|     (24,136)|SAMSUNG Galaxy F0...|     ₹7,299|   4.2|
|   (1,92,340)|SAMSUNG Galaxy F2...|    ₹15,499|   4.3|
|   (1,50,552)|SAMSUNG Galaxy F1...|     ₹9,699|   4.3|
|      (3,197)|SAMSUNG Galaxy S2...|    ₹54,999|   4.5|
|        (688)|SAMSUNG Galaxy S2...|  ₹1,24,999|   4.3|
|     (36,177)|vivo T2x 5G (Auro...|    ₹13,999|

In [5]:
df = df.withColumn("Rating",col("Rating").cast("float"))

In [6]:
df = df.withColumn("No_of_reviews",expr("substring(No_of_reviews, 2, length(No_of_reviews)-2)"))
df = df.withColumn("No_of_reviews", regexp_replace(col("No_of_reviews"), ",", "").cast(LongType()))

In [7]:
df = df.withColumn("Phone_price", substring(col("Phone_price"), 2, 100))
df = df.withColumn("Phone_price", regexp_replace(col("Phone_price"), ",", "").cast(LongType()))

In [8]:
print(df.show())

+-------------+--------------------+-----------+------+
|No_of_reviews|          Phone_name|Phone_price|Rating|
+-------------+--------------------+-----------+------+
|        37008|realme C55 (Rainf...|      10999|   4.4|
|         9245|realme GT 2 (Stee...|      39999|   4.3|
|        21867|POCO C55 (Cool Bl...|       7749|   4.2|
|         7266|POCO M5 (Icy Blue...|       8999|   4.2|
|         7726|POCO X5 Pro 5G (H...|      21999|   4.3|
|        14255|POCO C51 (Royal B...|       6999|   4.3|
|        76446|POCO M4 5G (Yello...|      10999|   4.2|
|         2472|POCO F5 5G (Snows...|      29999|   4.2|
|        24136|SAMSUNG Galaxy F0...|       7299|   4.2|
|        24136|SAMSUNG Galaxy F0...|       7299|   4.2|
|       192340|SAMSUNG Galaxy F2...|      15499|   4.3|
|       150552|SAMSUNG Galaxy F1...|       9699|   4.3|
|         3197|SAMSUNG Galaxy S2...|      54999|   4.5|
|          688|SAMSUNG Galaxy S2...|     124999|   4.3|
|        36177|vivo T2x 5G (Auro...|      13999|

In [9]:
df1 = df.filter((df["Phone_price"] < 10000))
Under_ten = df1.sort(desc("Phone_price"),desc("Rating"),desc("No_of_reviews")).limit(5)
print(Under_ten.show(truncate=False))

+-------------+------------------------------------------+-----------+------+
|No_of_reviews|Phone_name                                |Phone_price|Rating|
+-------------+------------------------------------------+-----------+------+
|20029        |MOTOROLA g13 (Matte Charcoal, 128 GB)     |9999       |3.9   |
|20029        |MOTOROLA g13 (Matte Charcoal, 128 GB)     |9999       |3.9   |
|150552       |SAMSUNG Galaxy F13 (Waterfall Blue, 64 GB)|9699       |4.3   |
|150552       |SAMSUNG Galaxy F13 (Waterfall Blue, 64 GB)|9699       |4.3   |
|232849       |REDMI 10 (Caribbean Green, 64 GB)         |9499       |4.3   |
+-------------+------------------------------------------+-----------+------+

None


In [10]:
df2 = df.filter((df["Phone_price"] <= 20000))
Under_twenty = df2.sort(desc("Phone_price"),desc("Rating"),desc("No_of_reviews")).limit(5)
print(Under_twenty.show(truncate=False))

+-------------+--------------------------------------+-----------+------+
|No_of_reviews|Phone_name                            |Phone_price|Rating|
+-------------+--------------------------------------+-----------+------+
|17362        |realme 10 Pro 5G (Nebula Blue, 128 GB)|19999      |4.3   |
|21900        |vivo T2 5G (Nitro Blaze, 128 GB)      |18999      |4.5   |
|21900        |vivo T2 5G (Nitro Blaze, 128 GB)      |18999      |4.5   |
|69636        |OPPO K10 5G (Ocean Blue, 128 GB)      |17499      |4.4   |
|69636        |OPPO K10 5G (Ocean Blue, 128 GB)      |17499      |4.4   |
+-------------+--------------------------------------+-----------+------+

None


In [11]:
df3 = df.filter((df["Phone_price"] <= 30000))
Under_thirty = df3.sort(desc("Phone_price"),desc("Rating"),desc("No_of_reviews")).limit(5)
print(Under_thirty.show(truncate=False))

+-------------+--------------------------------------------+-----------+------+
|No_of_reviews|Phone_name                                  |Phone_price|Rating|
+-------------+--------------------------------------------+-----------+------+
|10945        |REDMI Note 12 Pro+ 5G (Iceberg Blue, 256 GB)|29999      |4.3   |
|10945        |REDMI Note 12 Pro+ 5G (Iceberg Blue, 256 GB)|29999      |4.3   |
|9867         |OPPO Reno8T 5G (Sunrise Gold, 128 GB)       |29999      |4.3   |
|9867         |OPPO Reno8T 5G (Sunrise Gold, 128 GB)       |29999      |4.3   |
|2472         |POCO F5 5G (Snowstorm White, 256 GB)        |29999      |4.2   |
+-------------+--------------------------------------------+-----------+------+

None


In [12]:
df4 = df.filter((df["Phone_price"] <= 50000))
Under_fifty = df4.sort(desc("Phone_price"),desc("Rating"),desc("No_of_reviews")).limit(5)
print(Under_fifty.show(truncate=False))

+-------------+-----------------------------------+-----------+------+
|No_of_reviews|Phone_name                         |Phone_price|Rating|
+-------------+-----------------------------------+-----------+------+
|4541         |Google Pixel 7 (Lemongrass, 128 GB)|49999      |4.4   |
|4541         |Google Pixel 7 (Snow, 128 GB)      |49999      |4.4   |
|4541         |Google Pixel 7 (Snow, 128 GB)      |49999      |4.4   |
|4541         |Google Pixel 7 (Obsidian, 128 GB)  |49999      |4.4   |
|4541         |Google Pixel 7 (Obsidian, 128 GB)  |49999      |4.4   |
+-------------+-----------------------------------+-----------+------+

None


In [13]:
df5 = df.filter((df["Phone_price"] > 50000))
greater_than_fifty = df5.sort(desc("Phone_price"),desc("Rating"),desc("No_of_reviews")).limit(5)
print(greater_than_fifty.show(truncate=False))

+-------------+-------------------------------------------+-----------+------+
|No_of_reviews|Phone_name                                 |Phone_price|Rating|
+-------------+-------------------------------------------+-----------+------+
|688          |SAMSUNG Galaxy S23 Ultra 5G (Green, 256 GB)|124999     |4.3   |
|688          |SAMSUNG Galaxy S23 Ultra 5G (Green, 256 GB)|124999     |4.3   |
|209          |OPPO Find N2 Flip (Moonlit Purple, 256 GB) |89999      |4.3   |
|209          |OPPO Find N2 Flip (Astral Black, 256 GB)   |89999      |4.3   |
|209          |OPPO Find N2 Flip (Astral Black, 256 GB)   |89999      |4.3   |
+-------------+-------------------------------------------+-----------+------+

None
