In [53]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port', '0'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
appName("Spark").\
enableHiveSupport(). \
master('yarn'). \
getOrCreate()

In [54]:
spark

In [4]:
spark.sql("create table if not exists itv009820_retaildb.employees(emp_id int,emp_name string,dept_id int, salary int)")

In [5]:
spark.sql("select * from  itv009820_retaildb.employees")

emp_id,emp_name,dept_id,salary
3,sai pavan,1,30000
5,rajesh,2,16000
4,nagesh,2,15000
1,jeswanth,1,10000
2,naidu,1,20000


In [5]:
spark.sql("create table itv009820_retaildb.departments(dept_id int, dept_name string)")

In [6]:
spark.sql("select * from itv009820_retaildb.departments")

dept_id,dept_name
1,CSE
2,ECE
3,EEE


In [10]:
spark.sql("insert into itv009820_retaildb.employees values(2, 'naidu', 1, 20000)")
spark.sql("insert into itv009820_retaildb.employees values(3, 'sai pavan', 1, 30000)")
spark.sql("insert into itv009820_retaildb.employees values(4, 'nagesh', 2, 15000)")
spark.sql("insert into itv009820_retaildb.employees values(5, 'rajesh', 2, 16000)")
spark.sql("insert into itv009820_retaildb.departments values(1, 'CSE')")
spark.sql("insert into itv009820_retaildb.departments values(2, 'ECE')")
spark.sql("insert into itv009820_retaildb.departments values(3, 'EEE')")

In [8]:
spark.sql("""insert into itv009820_retaildb.employees values(1,'jeswanth',1,10000)""")

In [12]:
spark.sql("select * from itv009820_retaildb.employees order by emp_id")

emp_id,emp_name,dept_id,salary
1,jeswanth,1,10000
2,naidu,1,20000
3,sai pavan,1,30000
4,nagesh,2,15000
5,rajesh,2,16000


In [7]:
spark.sql("select * from itv009820_retaildb.employees e inner join itv009820_retaildb.departments d on e.dept_id==d.dept_id ")

emp_id,emp_name,dept_id,salary,dept_id.1,dept_name
3,sai pavan,1,30000,1,CSE
5,rajesh,2,16000,2,ECE
4,nagesh,2,15000,2,ECE
1,jeswanth,1,10000,1,CSE
2,naidu,1,20000,1,CSE


In [8]:
emp_df=spark.read.table("itv009820_retaildb.employees")
dept_df=spark.read.table("itv009820_retaildb.departments")

In [10]:
dept_df.show()

+-------+---------+
|dept_id|dept_name|
+-------+---------+
|      1|      CSE|
|      2|      ECE|
|      3|      EEE|
+-------+---------+



In [14]:
join_df=emp_df.join(dept_df,emp_df.dept_id==dept_df.dept_id,"Inner").orderBy("emp_id").show()

+------+---------+-------+------+-------+---------+
|emp_id| emp_name|dept_id|salary|dept_id|dept_name|
+------+---------+-------+------+-------+---------+
|     1| jeswanth|      1| 10000|      1|      CSE|
|     2|    naidu|      1| 20000|      1|      CSE|
|     3|sai pavan|      1| 30000|      1|      CSE|
|     4|   nagesh|      2| 15000|      2|      ECE|
|     5|   rajesh|      2| 16000|      2|      ECE|
+------+---------+-------+------+-------+---------+



In [12]:
join_df.show()

+------+---------+-------+------+-------+---------+
|emp_id| emp_name|dept_id|salary|dept_id|dept_name|
+------+---------+-------+------+-------+---------+
|     3|sai pavan|      1| 30000|      1|      CSE|
|     5|   rajesh|      2| 16000|      2|      ECE|
|     4|   nagesh|      2| 15000|      2|      ECE|
|     1| jeswanth|      1| 10000|      1|      CSE|
|     2|    naidu|      1| 20000|      1|      CSE|
+------+---------+-------+------+-------+---------+



In [16]:
sales=[(1,'2024-07-23',10,4000),(2,'2024-07-25',5,1000),(3,'2024-08-23',3,2000),(5,'2024-07-30',8,6000)]

In [17]:
print(sales)

[(1, '2024-07-23', 10, 4000), (2, '2024-07-25', 5, 1000), (3, '2024-08-23', 3, 2000), (5, '2024-07-30', 8, 6000)]


In [20]:
sale_df=spark.createDataFrame(sales,["product_id","sales_date","quantity","price"])

In [21]:
sale_df.show()

+----------+----------+--------+-----+
|product_id|sales_date|quantity|price|
+----------+----------+--------+-----+
|         1|2024-07-23|      10| 4000|
|         2|2024-07-25|       5| 1000|
|         3|2024-08-23|       3| 2000|
|         5|2024-07-30|       8| 6000|
+----------+----------+--------+-----+



In [25]:
from pyspark.sql.functions import *

In [34]:
total_sales=sale_df.withColumn("total_sale",expr("quantity*price"))

In [35]:
total_sales.show()

+----------+----------+--------+-----+----------+
|product_id|sales_date|quantity|price|total_sale|
+----------+----------+--------+-----+----------+
|         1|2024-07-23|      10| 4000|     40000|
|         2|2024-07-25|       5| 1000|      5000|
|         3|2024-08-23|       3| 2000|      6000|
|         5|2024-07-30|       8| 6000|     48000|
+----------+----------+--------+-----+----------+



In [3]:
orders_data = [
    (1, 101, "2024-07-01", 250.00),
    (2, 102, "2024-07-01", 300.00),
    (3, 101, "2024-07-02", 150.00),
    (4, 103, "2024-07-02", 450.00),
    (5, 104, "2024-07-03", 200.00),
    (6, 102, "2024-07-03", 350.00),
    (7, 101, "2024-07-04", 400.00),
    (8, 105, "2024-07-04", 500.00),
    (9, 103, "2024-07-05", 300.00),
    (10, 104, "2024-07-05", 100.00),
    (11, 105, "2024-07-06", 450.00),
    (12, 102, "2024-07-06", 250.00)
]

# Create DataFrame
orders_df = spark.createDataFrame(orders_data, ["order_id", "customer_id", "order_date", "amount"])

In [4]:
orders_df.show()

+--------+-----------+----------+------+
|order_id|customer_id|order_date|amount|
+--------+-----------+----------+------+
|       1|        101|2024-07-01| 250.0|
|       2|        102|2024-07-01| 300.0|
|       3|        101|2024-07-02| 150.0|
|       4|        103|2024-07-02| 450.0|
|       5|        104|2024-07-03| 200.0|
|       6|        102|2024-07-03| 350.0|
|       7|        101|2024-07-04| 400.0|
|       8|        105|2024-07-04| 500.0|
|       9|        103|2024-07-05| 300.0|
|      10|        104|2024-07-05| 100.0|
|      11|        105|2024-07-06| 450.0|
|      12|        102|2024-07-06| 250.0|
+--------+-----------+----------+------+



In [5]:
orders_df.createOrReplaceTempView("orders")

In [8]:
result_df=spark.sql("select customer_id from orders group by customer_id having count(*)>2")

In [10]:
result_df.show()

+-----------+
|customer_id|
+-----------+
|        101|
|        102|
+-----------+



In [10]:
from pyspark.sql.functions import count,countDistinct,col,sum as _sum

In [21]:
result_df1=orders_df.groupby("customer_id").agg(count("*")).alias("order_count")

In [16]:
count_dist_df1=orders_df.groupby("customer_id").agg(countDistinct("*")).alias("order_count")

In [17]:
count_dist_df1.show()

+-----------+-----------------------+
|customer_id|count(unresolvedstar())|
+-----------+-----------------------+
|        103|                      2|
|        104|                      2|
|        105|                      2|
|        101|                      3|
|        102|                      3|
+-----------+-----------------------+



In [24]:
result_df1.filter(col("count(1)")>2).show()

+-----------+--------+
|customer_id|count(1)|
+-----------+--------+
|        101|       3|
|        102|       3|
+-----------+--------+



In [25]:
###
Question 4: PySpark - Window Functions
Scenario: You have a DataFrame transactions with columns customer_id, transaction_date, and amount.

Task: Write a PySpark code to calculate the running total of the transaction amounts for each customer.
###

SyntaxError: invalid syntax (<ipython-input-25-db3a9f89d31d>, line 2)

In [3]:
transactions_data = [
    (101, "2024-07-01", 100.00),
    (102, "2024-07-01", 200.00),
    (101, "2024-07-02", 150.00),
    (103, "2024-07-02", 250.00),
    (101, "2024-07-03", 200.00),
    (102, "2024-07-03", 300.00),
    (104, "2024-07-03", 400.00),
    (103, "2024-07-04", 150.00),
    (104, "2024-07-04", 100.00),
    (101, "2024-07-05", 250.00),
    (102, "2024-07-05", 150.00),
    (103, "2024-07-06", 200.00),
    (104, "2024-07-06", 300.00),
    (105, "2024-07-06", 500.00)
]

In [4]:
transaction_df=spark.createDataFrame(transactions_data,["customer_id","transaction_date","amount"])

In [5]:
transaction_df.show()

+-----------+----------------+------+
|customer_id|transaction_date|amount|
+-----------+----------------+------+
|        101|      2024-07-01| 100.0|
|        102|      2024-07-01| 200.0|
|        101|      2024-07-02| 150.0|
|        103|      2024-07-02| 250.0|
|        101|      2024-07-03| 200.0|
|        102|      2024-07-03| 300.0|
|        104|      2024-07-03| 400.0|
|        103|      2024-07-04| 150.0|
|        104|      2024-07-04| 100.0|
|        101|      2024-07-05| 250.0|
|        102|      2024-07-05| 150.0|
|        103|      2024-07-06| 200.0|
|        104|      2024-07-06| 300.0|
|        105|      2024-07-06| 500.0|
+-----------+----------------+------+



In [11]:
from pyspark.sql.window import Window

In [12]:
window_spec=Window.partitionBy("customer_id").orderBy("transaction_date").rowsBetween(Window.unboundedPreceding, Window.currentRow)

In [13]:
trans_df=transaction_df.withColumn("transaction_amount",_sum("amount").over(window_spec))

In [14]:
trans_df.printSchema()

root
 |-- customer_id: long (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- transaction_amount: double (nullable = true)



In [15]:
trans_df.show()

+-----------+----------------+------+------------------+
|customer_id|transaction_date|amount|transaction_amount|
+-----------+----------------+------+------------------+
|        103|      2024-07-02| 250.0|             250.0|
|        103|      2024-07-04| 150.0|             400.0|
|        103|      2024-07-06| 200.0|             600.0|
|        104|      2024-07-03| 400.0|             400.0|
|        104|      2024-07-04| 100.0|             500.0|
|        104|      2024-07-06| 300.0|             800.0|
|        105|      2024-07-06| 500.0|             500.0|
|        101|      2024-07-01| 100.0|             100.0|
|        101|      2024-07-02| 150.0|             250.0|
|        101|      2024-07-03| 200.0|             450.0|
|        101|      2024-07-05| 250.0|             700.0|
|        102|      2024-07-01| 200.0|             200.0|
|        102|      2024-07-03| 300.0|             500.0|
|        102|      2024-07-05| 150.0|             650.0|
+-----------+----------------+-

In [16]:
Question 8: SQL - Aggregations
Scenario: You have a table sales with columns product_id, sale_date, units_sold, and revenue.

Task: Write a SQL query to find the total revenue and total units sold for each month.

SyntaxError: invalid syntax (<ipython-input-16-751ebeb8e64c>, line 1)

In [55]:
spark.sql("CREATE TABLE if not exists itv009820_retaildb.sales (product_id INT,sale_date DATE,units_sold INT,revenue DECIMAL(10, 2));")

In [56]:
spark.sql("""
INSERT INTO itv009820_retaildb.sales VALUES 
(1, DATE '2024-01-05', 10, 100.00),
(2, DATE '2024-01-15', 20, 300.00),
(3, DATE '2024-02-20', 15, 150.00),
(1, DATE '2024-02-25', 30, 300.00),
(2, DATE '2024-03-05', 25, 250.00),
(3, DATE '2024-03-10', 5, 50.00),
(1, DATE '2024-03-15', 35, 350.00),
(2, DATE '2024-04-05', 40, 400.00),
(3, DATE '2024-04-20', 10, 100.00),
(1, DATE '2024-04-25', 50, 500.00);
""")

In [57]:
spark.sql("select * from itv009820_retaildb.sales")

product_id,sale_date,units_sold,revenue
1,2024-01-05,10,100.0
2,2024-01-15,20,300.0
3,2024-02-20,15,150.0
1,2024-02-25,30,300.0
2,2024-03-05,25,250.0
1,2024-01-05,10,100.0
2,2024-01-15,20,300.0
3,2024-02-20,15,150.0
1,2024-02-25,30,300.0
2,2024-03-05,25,250.0


In [58]:
retaildb_df=spark.read.table("itv009820_retaildb.sales")

In [59]:
from pyspark.sql.functions import *

In [60]:
formatted_df = retaildb_df.select(date_format('sale_date', 'MM-dd-yyyy').alias('date'))

In [63]:
retaildb_df=retaildb_df.withColumn("date",date_format('sale_date','MM-yyyy'))

In [70]:
result_df=retaildb_df.groupby("date").agg(sum("units_sold").alias("total_units_sold"),sum("revenue").alias("total_revenue"))

In [71]:
result_df.show()

+-------+----------------+-------------+
|   date|total_units_sold|total_revenue|
+-------+----------------+-------------+
|03-2024|             130|      1300.00|
|01-2024|              60|       800.00|
|04-2024|             200|      2000.00|
|02-2024|              90|       900.00|
+-------+----------------+-------------+

