In [2]:
// Assume you're given the tables containing completed trade orders and user details in a Robinhood trading system.

// Write a query to retrieve the top three cities that have the highest number of completed trade orders listed in descending order. 
// Output the city name and the corresponding number of completed trade orders.

// Example Output:
// ----------------|--------------
// city            |  total_orders
// ----------------|---------------
// San Francisco   |      3
// Boston          |      2
// Denver          |      1



val trades_df = Seq(
  (100101,111,9.80,10,"Cancelled","08/17/2022 12:00:00"),
  (100102,111,10.00,10,"Completed","08/17/2022 12:00:00"),
  (100259,148,5.10,35,"Completed","08/25/2022 12:00:00"),
  (100264,148,4.80,40,"Completed","08/26/2022 12:00:00"),
  (100305,300,10.00,15,"Completed","09/05/2022 12:00:00"),
  (100400,178,9.90,15,"Completed","09/09/2022 12:00:00"),
  (100565,265,25.60,5,"Completed","12/19/2022 12:00:00")
).toDF("order_id","user_id","price","quantity","status","timestamp")


val users_df = Seq(
  (111,"San Francisco","rrok10@gmail.com","08/03/2021 12:00:00"),
  (148,"Boston","sailor9820@gmail.com","08/20/2021 12:00:00"),
  (178,"San Francisco","harrypotterfan182@gmail.com","01/05/2022 12:00:00"),
  (265,"Denver","shadower_@hotmail.com","02/26/2022 12:00:00"),
  (300,"San Francisco","houstoncowboy1122@hotmail.com","06/30/2022 12:00:00")
).toDF("user_id","city","email","signup_date")


trades_df.show(false)
users_df.show(false)


+--------+-------+-----+--------+---------+-------------------+
|order_id|user_id|price|quantity|status   |timestamp          |
+--------+-------+-----+--------+---------+-------------------+
|100101  |111    |9.8  |10      |Cancelled|08/17/2022 12:00:00|
|100102  |111    |10.0 |10      |Completed|08/17/2022 12:00:00|
|100259  |148    |5.1  |35      |Completed|08/25/2022 12:00:00|
|100264  |148    |4.8  |40      |Completed|08/26/2022 12:00:00|
|100305  |300    |10.0 |15      |Completed|09/05/2022 12:00:00|
|100400  |178    |9.9  |15      |Completed|09/09/2022 12:00:00|
|100565  |265    |25.6 |5       |Completed|12/19/2022 12:00:00|
+--------+-------+-----+--------+---------+-------------------+

+-------+-------------+-----------------------------+-------------------+
|user_id|city         |email                        |signup_date        |
+-------+-------------+-----------------------------+-------------------+
|111    |San Francisco|rrok10@gmail.com             |08/03/2021 12:00:00|

trades_df: org.apache.spark.sql.DataFrame = [order_id: int, user_id: int ... 4 more fields]
users_df: org.apache.spark.sql.DataFrame = [user_id: int, city: string ... 2 more fields]


In [7]:
// Using Dataframe API

val df1 = trades_df.as("t").join(users_df.as("u"), $"t.user_id" === $"u.user_id", "left"
                                ).where($"t.status" === "Completed"
                                       ).groupBy($"u.city").agg(count($"t.order_id").as("total_orders")
                                                               ).orderBy($"total_orders".desc).limit(3)
df1.explain()
df1.show(false)


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=3, orderBy=[total_orders#360L DESC NULLS LAST], output=[city#139,total_orders#360L])
   +- HashAggregate(keys=[city#139], functions=[count(1)])
      +- Exchange hashpartitioning(city#139, 200), ENSURE_REQUIREMENTS, [plan_id=448]
         +- HashAggregate(keys=[city#139], functions=[partial_count(1)])
            +- Project [city#139]
               +- BroadcastHashJoin [user_id#114], [user_id#138], LeftOuter, BuildRight, false
                  :- LocalTableScan [user_id#114]
                  +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=443]
                     +- LocalTableScan [user_id#138, city#139]


+-------------+------------+
|city         |total_orders|
+-------------+------------+
|San Francisco|3           |
|Boston       |2           |
|Denver       |1           |
+-------------+------------+



df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [city: string, total_orders: bigint]


In [8]:
// Using SPARK SQL

trades_df.createOrReplaceTempView("trades")
users_df.createOrReplaceTempView("users")

val df2 = spark.sql("""
    SELECT
        u.city,
        COUNT(t.order_id) total_orders
    FROM trades t 
    LEFT JOIN users u
    ON t.user_id=u.user_id
    WHERE status='Completed'
    GROUP BY u.city
    ORDER BY total_orders DESC
    LIMIT 3
""")

df2.explain()
df2.show(false)


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=3, orderBy=[total_orders#376L DESC NULLS LAST], output=[city#139,total_orders#376L])
   +- HashAggregate(keys=[city#139], functions=[count(1)])
      +- Exchange hashpartitioning(city#139, 200), ENSURE_REQUIREMENTS, [plan_id=597]
         +- HashAggregate(keys=[city#139], functions=[partial_count(1)])
            +- Project [city#139]
               +- BroadcastHashJoin [user_id#114], [user_id#138], LeftOuter, BuildRight, false
                  :- LocalTableScan [user_id#114]
                  +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=592]
                     +- LocalTableScan [user_id#138, city#139]


+-------------+------------+
|city         |total_orders|
+-------------+------------+
|San Francisco|3           |
|Boston       |2           |
|Denver       |1           |
+-------------+------------+



df2: org.apache.spark.sql.DataFrame = [city: string, total_orders: bigint]
