In [1]:
# ps-46
%pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=50edc81612cc8215113c9f316b8b1a508f4e2a816cbd0d28d63f1c340c3da823
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [2]:
# import necessary module
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [10]:
# create spark session
spark = SparkSession.builder.master('local').appName('pyspark-day-10').getOrCreate()
spark

In [11]:
 # Define the schema for Product table
product_schema = StructType([
 StructField("product_id", IntegerType(), True),
 StructField("product_name", StringType(), True),
 StructField("unit_price", IntegerType(), True)
])
# Data for Product table
product_data = [(1, "S8", 1000),
 (2, "G4", 800),
 (3, "iPhone", 1400)]

In [5]:
# Define the schema for Sales table
sales_schema = StructType([
 StructField("seller_id", IntegerType(), True),
 StructField("product_id", IntegerType(), True),
 StructField("buyer_id", IntegerType(), True),
 StructField("sale_date", StringType(), True),
 StructField("quantity", IntegerType(), True),
 StructField("price", IntegerType(), True)
])
# Data for Sales table
sales_data = [(1, 1, 1, "2019-01-21", 2, 2000),
 (1, 2, 2, "2019-02-17", 1, 800),
 (2, 2, 3, "2019-06-02", 1, 800),
 (3, 3, 4, "2019-05-13", 2, 2800)]

In [12]:
# create df for product
product_df = spark.createDataFrame(schema = product_schema, data = product_data)
product_df.show()

+----------+------------+----------+
|product_id|product_name|unit_price|
+----------+------------+----------+
|         1|          S8|      1000|
|         2|          G4|       800|
|         3|      iPhone|      1400|
+----------+------------+----------+



In [13]:
# create df for sales
sales_df = spark.createDataFrame(schema = sales_schema, data = sales_data)
sales_df.show()

+---------+----------+--------+----------+--------+-----+
|seller_id|product_id|buyer_id| sale_date|quantity|price|
+---------+----------+--------+----------+--------+-----+
|        1|         1|       1|2019-01-21|       2| 2000|
|        1|         2|       2|2019-02-17|       1|  800|
|        2|         2|       3|2019-06-02|       1|  800|
|        3|         3|       4|2019-05-13|       2| 2800|
+---------+----------+--------+----------+--------+-----+



In [14]:
product_df.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- unit_price: integer (nullable = true)



In [15]:
sales_df.printSchema()

root
 |-- seller_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- buyer_id: integer (nullable = true)
 |-- sale_date: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: integer (nullable = true)



In [19]:
# take best seller_id by calculating sum of price and then arrange them in desc order
result_df = sales_df.groupBy('seller_id').agg(sum('price').alias('total_price')).orderBy(col('total_price').desc())
result_df.show()

+---------+-----------+
|seller_id|total_price|
+---------+-----------+
|        1|       2800|
|        3|       2800|
|        2|        800|
+---------+-----------+



In [27]:
# take only first row
result_df = result_df.select('total_price').limit(1)
result_df.show()

+-----------+
|total_price|
+-----------+
|       2800|
+-----------+



In [30]:
# take numeric value of which is 2800
total_sales = result_df.first()['total_price']

In [31]:
# we will compare numeric value which other sellerid total price
sales_df.groupBy('seller_id').agg(sum('price').alias('total_price')).filter(col('total_price') == total_sales).show()

+---------+-----------+
|seller_id|total_price|
+---------+-----------+
|        1|       2800|
|        3|       2800|
+---------+-----------+



In [None]:
# alternate method for above is to use window function