
<p style="font-size: 24px; background: -webkit-linear-gradient(left, #FFD700, #FFAA00); -webkit-background-clip: text; -webkit-text-fill-color: transparent;">
  50 Advanced Premium Pyspark SQL Problems
</p>

![Databricks Logo](files/advancdsql.png)

# Select

## (Premium) 1821. Find Customers With Positive Revenue this Year
### Level: Easy
```
Table: Customers

+--------------+------+
| Column Name  | Type |
+--------------+------+
| customer_id  | int  |
| year         | int  |
| revenue      | int  |
+--------------+------+
(customer_id, year) is the primary key (combination of columns with unique values) for this table.
This table contains the customer ID and the revenue of customers in different years.
Note that this revenue can be negative.
 

Write a solution to report the customers with postive revenue in the year 2021.

Return the result table in any order.

The result format is in the following example.

 

Example 1:

Input: 
Customers table:
+-------------+------+---------+
| customer_id | year | revenue |
+-------------+------+---------+
| 1           | 2018 | 50      |
| 1           | 2021 | 30      |
| 1           | 2020 | 70      |
| 2           | 2021 | -50     |
| 3           | 2018 | 10      |
| 3           | 2016 | 50      |
| 4           | 2021 | 20      |
+-------------+------+---------+
Output: 
+-------------+
| customer_id |
+-------------+
| 1           |
| 4           |
+-------------+
Explanation: 
Customer 1 has revenue equal to 30 in the year 2021.
Customer 2 has revenue equal to -50 in the year 2021.
Customer 3 has no revenue in the year 2021.
Customer 4 has revenue equal to 20 in the year 2021.
Thus only customers 1 and 4 have positive revenue in the year 2021.
```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Assuming spark session already exists
# spark = SparkSession.builder.appName("Example").getOrCreate()

# Sample data
data = [[1, 2018, 50], [1, 2021, 30], [1, 2020, 70], [2, 2021, -50], [3, 2018, 10], [3, 2016, 50], [4, 2021, 20]]

# Define schema
schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("year", IntegerType(), True),
    StructField("revenue", IntegerType(), True)
])

# Create DataFrame
customers_df = spark.createDataFrame(data, schema=schema)

# Show the DataFrame
customers_df.show()


+-----------+----+-------+
|customer_id|year|revenue|
+-----------+----+-------+
|          1|2018|     50|
|          1|2021|     30|
|          1|2020|     70|
|          2|2021|    -50|
|          3|2018|     10|
|          3|2016|     50|
|          4|2021|     20|
+-----------+----+-------+



In [0]:
#Solution:
from pyspark.sql.functions import col
customers_df.filter((col('year') == 2021) & (col('revenue') > 0)).select(col('customer_id')).show()

+-----------+
|customer_id|
+-----------+
|          1|
|          4|
+-----------+



## 183. Customers Who Never Order
### Level: Easy
```
Table: Customers

+-------------+---------+
| Column Name | Type    |
+-------------+---------+
| id          | int     |
| name        | varchar |
+-------------+---------+
id is the primary key (column with unique values) for this table.
Each row of this table indicates the ID and name of a customer.
 

Table: Orders

+-------------+------+
| Column Name | Type |
+-------------+------+
| id          | int  |
| customerId  | int  |
+-------------+------+
id is the primary key (column with unique values) for this table.
customerId is a foreign key (reference columns) of the ID from the Customers table.
Each row of this table indicates the ID of an order and the ID of the customer who ordered it.
 

Write a solution to find all customers who never order anything.

Return the result table in any order.

The result format is in the following example.

Example 1:

Input: 
Customers table:
+----+-------+
| id | name  |
+----+-------+
| 1  | Joe   |
| 2  | Henry |
| 3  | Sam   |
| 4  | Max   |
+----+-------+
Orders table:
+----+------------+
| id | customerId |
+----+------------+
| 1  | 3          |
| 2  | 1          |
+----+------------+
Output: 
+-----------+
| Customers |
+-----------+
| Henry     |
| Max       |
+-----------+
```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Assuming spark session already exists
# spark = SparkSession.builder.appName("Example").getOrCreate()

# Sample data for customers
data_customers = [[1, 'Joe'], [2, 'Henry'], [3, 'Sam'], [4, 'Max']]

# Define schema for customers
schema_customers = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True)
])

# Create DataFrame for customers
customers_df = spark.createDataFrame(data_customers, schema=schema_customers)

# Show the DataFrame for customers
customers_df.show()

# Sample data for orders
data_orders = [[1, 3], [2, 1]]

# Define schema for orders
schema_orders = StructType([
    StructField("id", IntegerType(), True),
    StructField("customerId", IntegerType(), True)
])

# Create DataFrame for orders
orders_df = spark.createDataFrame(data_orders, schema=schema_orders)

# Show the DataFrame for orders
orders_df.show()


+---+-----+
| id| name|
+---+-----+
|  1|  Joe|
|  2|Henry|
|  3|  Sam|
|  4|  Max|
+---+-----+

+---+----------+
| id|customerId|
+---+----------+
|  1|         3|
|  2|         1|
+---+----------+



In [0]:
#solution 1:

order_ids = [row.customerId for row in orders_df.select('customerId').distinct().collect()]
customers_df.filter(~col('id').isin(order_ids)).select(col('name').alias('Customers')).show()

+---------+
|Customers|
+---------+
|    Henry|
|      Max|
+---------+



In [0]:
# Solution 2:
order_ids_df = orders_df.select(col('customerId').alias('id')).distinct()

customers_not_in_orders_df = customers_df.join(
    order_ids_df,
    on='id',
    how='left_anti'
)

customers_not_in_orders_df.select(col('name').alias('Customers')).show()

+---------+
|Customers|
+---------+
|    Henry|
|      Max|
+---------+



## 1873. Calculate Special Bonus
### Level: Easy
```
Table: Employees

+-------------+---------+
| Column Name | Type    |
+-------------+---------+
| employee_id | int     |
| name        | varchar |
| salary      | int     |
+-------------+---------+
employee_id is the primary key (column with unique values) for this table.
Each row of this table indicates the employee ID, employee name, and salary.
 

Write a solution to calculate the bonus of each employee. The bonus of an employee is 100% of their salary
 if the ID of the employee is an odd number and the employee's name does not start with the character
  'M'. The bonus of an employee is 0 otherwise.

Return the result table ordered by employee_id.

The result format is in the following example.

 

Example 1:

Input: 
Employees table:
+-------------+---------+--------+
| employee_id | name    | salary |
+-------------+---------+--------+
| 2           | Meir    | 3000   |
| 3           | Michael | 3800   |
| 7           | Addilyn | 7400   |
| 8           | Juan    | 6100   |
| 9           | Kannon  | 7700   |
+-------------+---------+--------+
Output: 
+-------------+-------+
| employee_id | bonus |
+-------------+-------+
| 2           | 0     |
| 3           | 0     |
| 7           | 7400  |
| 8           | 0     |
| 9           | 7700  |
+-------------+-------+
Explanation: 
The employees with IDs 2 and 8 get 0 bonus because they have an even employee_id.
The employee with ID 3 gets 0 bonus because their name starts with 'M'.
The rest of the employees get a 100% bonus.
```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Define the schema for the DataFrame
schema = StructType([
    StructField("employee_id", IntegerType(), nullable=False),
    StructField("name", StringType(), nullable=True),
    StructField("salary", IntegerType(), nullable=False)
])

# Create a list of rows from the pandas DataFrame
data = [
    (2, 'Meir', 3000),
    (3, 'Michael', 3800),
    (7, 'Addilyn', 7400),
    (8, 'Juan', 6100),
    (9, 'Kannon', 7700)
]

# Create a PySpark DataFrame from the data and schema
employees_df = spark.createDataFrame(data, schema=schema)
employees_df.show() 

+-----------+-------+------+
|employee_id|   name|salary|
+-----------+-------+------+
|          2|   Meir|  3000|
|          3|Michael|  3800|
|          7|Addilyn|  7400|
|          8|   Juan|  6100|
|          9| Kannon|  7700|
+-----------+-------+------+



In [0]:
# Solution:
from pyspark.sql.functions import *
employees_df.withColumn('bonus',\
                        when(((col('employee_id') % 2 != 0) & (~col('name').like('M%'))), col('salary')).otherwise(0))\
    .select('employee_id','bonus').orderBy(col('employee_id')).show()  

+-----------+-----+
|employee_id|bonus|
+-----------+-----+
|          2|    0|
|          3|    0|
|          7| 7400|
|          8|    0|
|          9| 7700|
+-----------+-----+



## (Premium)1398. Customers Who Bought Products A and B but Not C
### Level: Medium
```
Table: Customers

+---------------------+---------+
| Column Name         | Type    |
+---------------------+---------+
| customer_id         | int     |
| customer_name       | varchar |
+---------------------+---------+
customer_id is the column with unique values for this table.
customer_name is the name of the customer.
 

Table: Orders

+---------------+---------+
| Column Name   | Type    |
+---------------+---------+
| order_id      | int     |
| customer_id   | int     |
| product_name  | varchar |
+---------------+---------+
order_id is the column with unique values for this table.
customer_id is the id of the customer who bought the product "product_name".
 

Write a solution to report the customer_id and customer_name of customers who bought products "A", "B" but did not buy the product "C" since we want to recommend them to purchase this product.

Return the result table ordered by customer_id.

The result format is in the following example.

 

Example 1:

Input: 
Customers table:
+-------------+---------------+
| customer_id | customer_name |
+-------------+---------------+
| 1           | Daniel        |
| 2           | Diana         |
| 3           | Elizabeth     |
| 4           | Jhon          |
+-------------+---------------+
Orders table:
+------------+--------------+---------------+
| order_id   | customer_id  | product_name  |
+------------+--------------+---------------+
| 10         |     1        |     A         |
| 20         |     1        |     B         |
| 30         |     1        |     D         |
| 40         |     1        |     C         |
| 50         |     2        |     A         |
| 60         |     3        |     A         |
| 70         |     3        |     B         |
| 80         |     3        |     D         |
| 90         |     4        |     C         |
+------------+--------------+---------------+
Output: 
+-------------+---------------+
| customer_id | customer_name |
+-------------+---------------+
| 3           | Elizabeth     |
+-------------+---------------+
Explanation: Only the customer_id with id 3 bought the product A and B but not the product C.
```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Define the schema for the 'customers' DataFrame
customers_schema = StructType([
    StructField("customer_id", IntegerType(), nullable=False),
    StructField("customer_name", StringType(), nullable=True)
])

# Define the schema for the 'orders' DataFrame
orders_schema = StructType([
    StructField("order_id", IntegerType(), nullable=False),
    StructField("customer_id", IntegerType(), nullable=False),
    StructField("product_name", StringType(), nullable=True)
])

# Data for 'customers' DataFrame
customers_data = [
    (1, 'Daniel'),
    (2, 'Diana'),
    (3, 'Elizabeth'),
    (4, 'Jhon')
]

# Data for 'orders' DataFrame
orders_data = [
    (10, 1, 'A'),
    (20, 1, 'B'),
    (30, 1, 'D'),
    (40, 1, 'C'),
    (50, 2, 'A'),
    (60, 3, 'A'),
    (70, 3, 'B'),
    (80, 3, 'D'),
    (90, 4, 'C')
]

# Create PySpark DataFrames from the data and schemas
customers_df = spark.createDataFrame(customers_data, schema=customers_schema)
orders_df = spark.createDataFrame(orders_data, schema=orders_schema)
customers_df.show()
orders_df.show()

+-----------+-------------+
|customer_id|customer_name|
+-----------+-------------+
|          1|       Daniel|
|          2|        Diana|
|          3|    Elizabeth|
|          4|         Jhon|
+-----------+-------------+

+--------+-----------+------------+
|order_id|customer_id|product_name|
+--------+-----------+------------+
|      10|          1|           A|
|      20|          1|           B|
|      30|          1|           D|
|      40|          1|           C|
|      50|          2|           A|
|      60|          3|           A|
|      70|          3|           B|
|      80|          3|           D|
|      90|          4|           C|
+--------+-----------+------------+



In [0]:
orders_df.alias('O').join(customers_df.alias('C'), col('O.customer_id') == col('C.customer_id'),'left')\
    .filter(col('product_name').isin('A','B','C')).groupBy(col('O.customer_id'),col('C.customer_name'))\
    .agg(count(col('product_name')).alias('noOfProductsBrought')).filter(col('noOfProductsBrought') == 2)\
        .drop(col('noOfProductsBrought')).show()

+-----------+-------------+
|customer_id|customer_name|
+-----------+-------------+
|          3|    Elizabeth|
+-----------+-------------+



## (Premium) 1112. Highest Grade For Each Student
### Level: Medium
```
Table: Enrollments

+---------------+---------+
| Column Name   | Type    |
+---------------+---------+
| student_id    | int     |
| course_id     | int     |
| grade         | int     |
+---------------+---------+
(student_id, course_id) is the primary key (combination of columns with unique values) of this table.
grade is never NULL.
 

Write a solution to find the highest grade with its corresponding course for each student. In case of a tie, you should find the course with the smallest course_id.

Return the result table ordered by student_id in ascending order.

The result format is in the following example.

 

Example 1:

Input: 
Enrollments table:
+------------+-------------------+
| student_id | course_id | grade |
+------------+-----------+-------+
| 2          | 2         | 95    |
| 2          | 3         | 95    |
| 1          | 1         | 90    |
| 1          | 2         | 99    |
| 3          | 1         | 80    |
| 3          | 2         | 75    |
| 3          | 3         | 82    |
+------------+-----------+-------+
Output: 
+------------+-------------------+
| student_id | course_id | grade |
+------------+-----------+-------+
| 1          | 2         | 99    |
| 2          | 2         | 95    |
| 3          | 3         | 82    |
+------------+-----------+-------+
```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType

# Define the schema for the 'enrollments' DataFrame
enrollments_schema = StructType([
    StructField("student_id", IntegerType(), nullable=False),
    StructField("course_id", IntegerType(), nullable=False),
    StructField("grade", IntegerType(), nullable=False)
])

# Data for 'enrollments' DataFrame
enrollments_data = [
    (2, 2, 95),
    (2, 3, 95),
    (1, 1, 90),
    (1, 2, 99),
    (3, 1, 80),
    (3, 2, 75),
    (3, 3, 82)
]

# Create PySpark DataFrame from the data and schema
enrollments_df = spark.createDataFrame(enrollments_data, schema=enrollments_schema)
enrollments_df.show()

+----------+---------+-----+
|student_id|course_id|grade|
+----------+---------+-----+
|         2|        2|   95|
|         2|        3|   95|
|         1|        1|   90|
|         1|        2|   99|
|         3|        1|   80|
|         3|        2|   75|
|         3|        3|   82|
+----------+---------+-----+



In [0]:
#Solution:
from pyspark.sql.functions import *
from pyspark.sql.window import Window
windowOptions = Window.partitionBy(col('student_id')).orderBy(col('grade').desc(),col('course_id'))

enrollments_df.withColumn('Rn',row_number().over(windowOptions)).filter(col('Rn') == 1).drop(col('Rn')).show()

+----------+---------+-----+
|student_id|course_id|grade|
+----------+---------+-----+
|         1|        2|   99|
|         2|        2|   95|
|         3|        3|   82|
+----------+---------+-----+



# Basic Joins

## 175. Combine Two Tables
### Level: Easy
```
Table: Person

+-------------+---------+
| Column Name | Type    |
+-------------+---------+
| personId    | int     |
| lastName    | varchar |
| firstName   | varchar |
+-------------+---------+
personId is the primary key (column with unique values) for this table.
This table contains information about the ID of some persons and their first and last names.
 

Table: Address

+-------------+---------+
| Column Name | Type    |
+-------------+---------+
| addressId   | int     |
| personId    | int     |
| city        | varchar |
| state       | varchar |
+-------------+---------+
addressId is the primary key (column with unique values) for this table.
Each row of this table contains information about the city and state of one person with ID = PersonId.
 

Write a solution to report the first name, last name, city, and state of each person in the Person table. If the address of a personId is not present in the Address table, report null instead.

Return the result table in any order.

The result format is in the following example.

 

Example 1:

Input: 
Person table:
+----------+----------+-----------+
| personId | lastName | firstName |
+----------+----------+-----------+
| 1        | Wang     | Allen     |
| 2        | Alice    | Bob       |
+----------+----------+-----------+
Address table:
+-----------+----------+---------------+------------+
| addressId | personId | city          | state      |
+-----------+----------+---------------+------------+
| 1         | 2        | New York City | New York   |
| 2         | 3        | Leetcode      | California |
+-----------+----------+---------------+------------+
Output: 
+-----------+----------+---------------+----------+
| firstName | lastName | city          | state    |
+-----------+----------+---------------+----------+
| Allen     | Wang     | Null          | Null     |
| Bob       | Alice    | New York City | New York |
+-----------+----------+---------------+----------+
Explanation: 
There is no address in the address table for the personId = 1 so we return null in their city and state.
addressId = 1 contains information about the address of personId = 2.
```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Define the schema for the 'person' DataFrame
person_schema = StructType([
    StructField("personId", IntegerType(), nullable=False),
    StructField("lastName", StringType(), nullable=True),
    StructField("firstName", StringType(), nullable=True)
])

# Data for 'person' DataFrame
person_data = [
    (1, 'Wang', 'Allen'),
    (2, 'Alice', 'Bob')
]

# Create PySpark DataFrame for 'person'
person_df = spark.createDataFrame(person_data, schema=person_schema)

# Define the schema for the 'address' DataFrame
address_schema = StructType([
    StructField("addressId", IntegerType(), nullable=False),
    StructField("personId", IntegerType(), nullable=False),
    StructField("city", StringType(), nullable=True),
    StructField("state", StringType(), nullable=True)
])

# Data for 'address' DataFrame
address_data = [
    (1, 2, 'New York City', 'New York'),
    (2, 3, 'Leetcode', 'California')
]

# Create PySpark DataFrame for 'address'
address_df = spark.createDataFrame(address_data, schema=address_schema)
person_df.show()
address_df.show()

+--------+--------+---------+
|personId|lastName|firstName|
+--------+--------+---------+
|       1|    Wang|    Allen|
|       2|   Alice|      Bob|
+--------+--------+---------+

+---------+--------+-------------+----------+
|addressId|personId|         city|     state|
+---------+--------+-------------+----------+
|        1|       2|New York City|  New York|
|        2|       3|     Leetcode|California|
+---------+--------+-------------+----------+



In [0]:
#Solution: 
from pyspark.sql.functions import col
person_df.alias('P').join(address_df.alias('A'),col('P.personId') == col('A.personId'),'left')\
    .select(col('firstName'),col('lastName'),col('city'),col('state')).show()


+---------+--------+-------------+--------+
|firstName|lastName|         city|   state|
+---------+--------+-------------+--------+
|    Allen|    Wang|         null|    null|
|      Bob|   Alice|New York City|New York|
+---------+--------+-------------+--------+



## (Premium) 1607. Sellers With No Sales
```
Table: Customer

+---------------+---------+
| Column Name   | Type    |
+---------------+---------+
| customer_id   | int     |
| customer_name | varchar |
+---------------+---------+
customer_id is the column with unique values for this table.
Each row of this table contains the information of each customer in the WebStore.
 

Table: Orders

+---------------+---------+
| Column Name   | Type    |
+---------------+---------+
| order_id      | int     |
| sale_date     | date    |
| order_cost    | int     |
| customer_id   | int     |
| seller_id     | int     |
+---------------+---------+
order_id is the column with unique values for this table.
Each row of this table contains all orders made in the webstore.
sale_date is the date when the transaction was made between the customer (customer_id) and the seller (seller_id).
 

Table: Seller

+---------------+---------+
| Column Name   | Type    |
+---------------+---------+
| seller_id     | int     |
| seller_name   | varchar |
+---------------+---------+
seller_id is the column with unique values for this table.
Each row of this table contains the information of each seller.
 

Write a solution to report the names of all sellers who did not make any sales in 2020.

Return the result table ordered by seller_name in ascending order.

The result format is in the following example.

 

Example 1:

Input: 
Customer table:
+--------------+---------------+
| customer_id  | customer_name |
+--------------+---------------+
| 101          | Alice         |
| 102          | Bob           |
| 103          | Charlie       |
+--------------+---------------+
Orders table:
+-------------+------------+--------------+-------------+-------------+
| order_id    | sale_date  | order_cost   | customer_id | seller_id   |
+-------------+------------+--------------+-------------+-------------+
| 1           | 2020-03-01 | 1500         | 101         | 1           |
| 2           | 2020-05-25 | 2400         | 102         | 2           |
| 3           | 2019-05-25 | 800          | 101         | 3           |
| 4           | 2020-09-13 | 1000         | 103         | 2           |
| 5           | 2019-02-11 | 700          | 101         | 2           |
+-------------+------------+--------------+-------------+-------------+
Seller table:
+-------------+-------------+
| seller_id   | seller_name |
+-------------+-------------+
| 1           | Daniel      |
| 2           | Elizabeth   |
| 3           | Frank       |
+-------------+-------------+
Output: 
+-------------+
| seller_name |
+-------------+
| Frank       |
+-------------+
Explanation: 
Daniel made 1 sale in March 2020.
Elizabeth made 2 sales in 2020 and 1 sale in 2019.
Frank made 1 sale in 2019 but no sales in 2020.

```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

# Define the schema for the 'customer' DataFrame
customer_schema = StructType([
    StructField("customer_id", IntegerType(), nullable=False),
    StructField("customer_name", StringType(), nullable=True)
])

# Data for 'customer' DataFrame
customer_data = [
    (101, 'Alice'),
    (102, 'Bob'),
    (103, 'Charlie')
]

# Create PySpark DataFrame for 'customer'
customer_df = spark.createDataFrame(customer_data, schema=customer_schema)

# Define the schema for the 'orders' DataFrame
orders_schema = StructType([
    StructField("order_id", IntegerType(), nullable=False),
    StructField("sale_date", StringType(), nullable=True),
    StructField("order_cost", IntegerType(), nullable=True),
    StructField("customer_id", IntegerType(), nullable=True),
    StructField("seller_id", IntegerType(), nullable=True)
])

# Data for 'orders' DataFrame
orders_data = [
    (1, '2020-03-01', 1500, 101, 1),
    (2, '2020-05-25', 2400, 102, 2),
    (3, '2019-05-25', 800, 101, 3),
    (4, '2020-09-13', 1000, 103, 2),
    (5, '2019-02-11', 700, 101, 2)
]

# Create PySpark DataFrame for 'orders'
orders_df = spark.createDataFrame(orders_data, schema=orders_schema)

# Define the schema for the 'seller' DataFrame
seller_schema = StructType([
    StructField("seller_id", IntegerType(), nullable=False),
    StructField("seller_name", StringType(), nullable=True)
])

# Data for 'seller' DataFrame
seller_data = [
    (1, 'Daniel'),
    (2, 'Elizabeth'),
    (3, 'Frank')
]

# Create PySpark DataFrame for 'seller'
seller_df = spark.createDataFrame(seller_data, schema=seller_schema)

customer_df.show()
orders_df.show()
seller_df.show()

+-----------+-------------+
|customer_id|customer_name|
+-----------+-------------+
|        101|        Alice|
|        102|          Bob|
|        103|      Charlie|
+-----------+-------------+

+--------+----------+----------+-----------+---------+
|order_id| sale_date|order_cost|customer_id|seller_id|
+--------+----------+----------+-----------+---------+
|       1|2020-03-01|      1500|        101|        1|
|       2|2020-05-25|      2400|        102|        2|
|       3|2019-05-25|       800|        101|        3|
|       4|2020-09-13|      1000|        103|        2|
|       5|2019-02-11|       700|        101|        2|
+--------+----------+----------+-----------+---------+

+---------+-----------+
|seller_id|seller_name|
+---------+-----------+
|        1|     Daniel|
|        2|  Elizabeth|
|        3|      Frank|
+---------+-----------+



In [0]:
from pyspark.sql.functions import *
df = seller_df.alias('S').join(orders_df.alias('A'), col('S.seller_id') == col('A.seller_id'),'left').filter((year(col('sale_date')) == '2020')).select(col('S.seller_id'))
seller_ids_2020 = [row.seller_id for row in df.collect()]
seller_df.filter(~col('seller_id').isin(seller_ids_2020)).select(col('seller_name')).show()

+-----------+
|seller_name|
+-----------+
|      Frank|
+-----------+




## 1407. Top Travellers
```
Table: Users

+---------------+---------+
| Column Name   | Type    |
+---------------+---------+
| id            | int     |
| name          | varchar |
+---------------+---------+
id is the column with unique values for this table.
name is the name of the user.
 

Table: Rides

+---------------+---------+
| Column Name   | Type    |
+---------------+---------+
| id            | int     |
| user_id       | int     |
| distance      | int     |
+---------------+---------+
id is the column with unique values for this table.
user_id is the id of the user who traveled the distance "distance".
 

Write a solution to report the distance traveled by each user.

Return the result table ordered by travelled_distance in descending order, if two or more users traveled the same distance, order them by their name in ascending order.

The result format is in the following example.

 

Example 1:

Input: 
Users table:
+------+-----------+
| id   | name      |
+------+-----------+
| 1    | Alice     |
| 2    | Bob       |
| 3    | Alex      |
| 4    | Donald    |
| 7    | Lee       |
| 13   | Jonathan  |
| 19   | Elvis     |
+------+-----------+
Rides table:
+------+----------+----------+
| id   | user_id  | distance |
+------+----------+----------+
| 1    | 1        | 120      |
| 2    | 2        | 317      |
| 3    | 3        | 222      |
| 4    | 7        | 100      |
| 5    | 13       | 312      |
| 6    | 19       | 50       |
| 7    | 7        | 120      |
| 8    | 19       | 400      |
| 9    | 7        | 230      |
+------+----------+----------+
Output: 
+----------+--------------------+
| name     | travelled_distance |
+----------+--------------------+
| Elvis    | 450                |
| Lee      | 450                |
| Bob      | 317                |
| Jonathan | 312                |
| Alex     | 222                |
| Alice    | 120                |
| Donald   | 0                  |
+----------+--------------------+
Explanation: 
Elvis and Lee traveled 450 miles, Elvis is the top traveler as his name is alphabetically smaller than Lee.
Bob, Jonathan, Alex, and Alice have only one ride and we just order them by the total distances of the ride.
Donald did not have any rides, the distance traveled by him is 0.
```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create SparkSession
spark = SparkSession.builder.appName("example").getOrCreate()

# Define data
data_users = [(1, 'Alice'), (2, 'Bob'), (3, 'Alex'), (4, 'Donald'), (7, 'Lee'), (13, 'Jonathan'), (19, 'Elvis')]
data_rides = [(1, 1, 120), (2, 2, 317), (3, 3, 222), (4, 7, 100), (5, 13, 312), (6, 19, 50), (7, 7, 120), (8, 19, 400), (9, 7, 230)]

# Create PySpark DataFrames
users_df = spark.createDataFrame(data_users, schema=['id', 'name'])
rides_df = spark.createDataFrame(data_rides, schema=['id', 'user_id', 'distance'])

# Show DataFrames
users_df.show()
rides_df.show()


+---+--------+
| id|    name|
+---+--------+
|  1|   Alice|
|  2|     Bob|
|  3|    Alex|
|  4|  Donald|
|  7|     Lee|
| 13|Jonathan|
| 19|   Elvis|
+---+--------+

+---+-------+--------+
| id|user_id|distance|
+---+-------+--------+
|  1|      1|     120|
|  2|      2|     317|
|  3|      3|     222|
|  4|      7|     100|
|  5|     13|     312|
|  6|     19|      50|
|  7|      7|     120|
|  8|     19|     400|
|  9|      7|     230|
+---+-------+--------+



In [0]:
rides_df.alias('R').join(users_df.alias('U'), col('R.user_id') == col('U.id'),'right')\
    .groupBy(col('R.user_id'),col('U.name'))\
        .agg(sum(col('distance')).alias('travelled_distance'))\
        .select(col('U.name'),col('travelled_distance')).fillna(0)\
            .orderBy(col('travelled_distance').desc(),col('U.name')).show()

+--------+------------------+
|    name|travelled_distance|
+--------+------------------+
|   Elvis|               450|
|     Lee|               450|
|     Bob|               317|
|Jonathan|               312|
|    Alex|               222|
|   Alice|               120|
|  Donald|                 0|
+--------+------------------+



## 607. Sales Person
```
Table: SalesPerson

+-----------------+---------+
| Column Name     | Type    |
+-----------------+---------+
| sales_id        | int     |
| name            | varchar |
| salary          | int     |
| commission_rate | int     |
| hire_date       | date    |
+-----------------+---------+
sales_id is the primary key (column with unique values) for this table.
Each row of this table indicates the name and the ID of a salesperson alongside their salary, commission rate, and hire date.
 

Table: Company

+-------------+---------+
| Column Name | Type    |
+-------------+---------+
| com_id      | int     |
| name        | varchar |
| city        | varchar |
+-------------+---------+
com_id is the primary key (column with unique values) for this table.
Each row of this table indicates the name and the ID of a company and the city in which the company is located.
 

Table: Orders

+-------------+------+
| Column Name | Type |
+-------------+------+
| order_id    | int  |
| order_date  | date |
| com_id      | int  |
| sales_id    | int  |
| amount      | int  |
+-------------+------+
order_id is the primary key (column with unique values) for this table.
com_id is a foreign key (reference column) to com_id from the Company table.
sales_id is a foreign key (reference column) to sales_id from the SalesPerson table.
Each row of this table contains information about one order. This includes the ID of the company, the ID of the salesperson, the date of the order, and the amount paid.
 

Write a solution to find the names of all the salespersons who did not have any orders related to the company with the name "RED".

Return the result table in any order.

The result format is in the following example.

 

Example 1:

Input: 
SalesPerson table:
+----------+------+--------+-----------------+------------+
| sales_id | name | salary | commission_rate | hire_date  |
+----------+------+--------+-----------------+------------+
| 1        | John | 100000 | 6               | 4/1/2006   |
| 2        | Amy  | 12000  | 5               | 5/1/2010   |
| 3        | Mark | 65000  | 12              | 12/25/2008 |
| 4        | Pam  | 25000  | 25              | 1/1/2005   |
| 5        | Alex | 5000   | 10              | 2/3/2007   |
+----------+------+--------+-----------------+------------+
Company table:
+--------+--------+----------+
| com_id | name   | city     |
+--------+--------+----------+
| 1      | RED    | Boston   |
| 2      | ORANGE | New York |
| 3      | YELLOW | Boston   |
| 4      | GREEN  | Austin   |
+--------+--------+----------+
Orders table:
+----------+------------+--------+----------+--------+
| order_id | order_date | com_id | sales_id | amount |
+----------+------------+--------+----------+--------+
| 1        | 1/1/2014   | 3      | 4        | 10000  |
| 2        | 2/1/2014   | 4      | 5        | 5000   |
| 3        | 3/1/2014   | 1      | 1        | 50000  |
| 4        | 4/1/2014   | 1      | 4        | 25000  |
+----------+------------+--------+----------+--------+
Output: 
+------+
| name |
+------+
| Amy  |
| Mark |
| Alex |
+------+
Explanation: 
According to orders 3 and 4 in the Orders table, it is easy to tell that only salesperson John and Pam have sales to company RED, so we report all the other names in the table salesperson.
```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder.appName("Example").getOrCreate()

# Define the data
sales_person_data = [
    (1, 'John', 100000, 6, '2006-04-01'),
    (2, 'Amy', 12000, 5, '2010-05-01'),
    (3, 'Mark', 65000, 12, '2008-12-25'),
    (4, 'Pam', 25000, 25, '2005-01-01'),
    (5, 'Alex', 5000, 10, '2007-02-03')
]

company_data = [
    (1, 'RED', 'Boston'),
    (2, 'ORANGE', 'New York'),
    (3, 'YELLOW', 'Boston'),
    (4, 'GREEN', 'Austin')
]

orders_data = [
    (1, '2014-01-01', 3, 4, 10000),
    (2, '2014-02-01', 4, 5, 5000),
    (3, '2014-03-01', 1, 1, 50000),
    (4, '2014-04-01', 1, 4, 25000)
]

# Create DataFrames
sales_person_df = spark.createDataFrame(sales_person_data, ['sales_id', 'name', 'salary', 'commission_rate', 'hire_date'])
company_df = spark.createDataFrame(company_data, ['com_id', 'name', 'city'])
orders_df = spark.createDataFrame(orders_data, ['order_id', 'order_date', 'com_id', 'sales_id', 'amount'])

# Convert columns to appropriate types
sales_person_df = sales_person_df.withColumn('hire_date', col('hire_date').cast('date'))
orders_df = orders_df.withColumn('order_date', col('order_date').cast('date'))

# Show DataFrames
sales_person_df.show()
company_df.show()
orders_df.show()


+--------+----+------+---------------+----------+
|sales_id|name|salary|commission_rate| hire_date|
+--------+----+------+---------------+----------+
|       1|John|100000|              6|2006-04-01|
|       2| Amy| 12000|              5|2010-05-01|
|       3|Mark| 65000|             12|2008-12-25|
|       4| Pam| 25000|             25|2005-01-01|
|       5|Alex|  5000|             10|2007-02-03|
+--------+----+------+---------------+----------+

+------+------+--------+
|com_id|  name|    city|
+------+------+--------+
|     1|   RED|  Boston|
|     2|ORANGE|New York|
|     3|YELLOW|  Boston|
|     4| GREEN|  Austin|
+------+------+--------+

+--------+----------+------+--------+------+
|order_id|order_date|com_id|sales_id|amount|
+--------+----------+------+--------+------+
|       1|2014-01-01|     3|       4| 10000|
|       2|2014-02-01|     4|       5|  5000|
|       3|2014-03-01|     1|       1| 50000|
|       4|2014-04-01|     1|       4| 25000|
+--------+----------+------+-----

In [0]:
# Write a solution to find the names of all the salespersons who did not have any orders related to the company with the name "RED".
# Return the result table in any order.
#Solution:

df = sales_person_df.alias('S').join(orders_df.alias('O'), col('O.sales_id') == col('S.sales_id'),'left')\
    .join(company_df.alias('C'), col('C.com_id') == col('O.com_id'),'left').filter(col('C.name') == 'RED').select(col('S.sales_id').alias('Sales'))

sales_person_df.filter(~col('sales_id').isin(df.select('Sales').rdd.flatMap(lambda x: x).collect())).select(col('name')).show()


+----+
|name|
+----+
| Amy|
|Mark|
|Alex|
+----+



## (Premium) 1440. Evaluate Boolean Expression. 
```
Table Variables:

+---------------+---------+
| Column Name   | Type    |
+---------------+---------+
| name          | varchar |
| value         | int     |
+---------------+---------+
In SQL, name is the primary key for this table.
This table contains the stored variables and their values.
 

Table Expressions:

+---------------+---------+
| Column Name   | Type    |
+---------------+---------+
| left_operand  | varchar |
| operator      | enum    |
| right_operand | varchar |
+---------------+---------+
In SQL, (left_operand, operator, right_operand) is the primary key for this table.
This table contains a boolean expression that should be evaluated.
operator is an enum that takes one of the values ('<', '>', '=')
The values of left_operand and right_operand are guaranteed to be in the Variables table.
 

Evaluate the boolean expressions in Expressions table.

Return the result table in any order.

The result format is in the following example.

 
Example 1:

Input: 
Variables table:
+------+-------+
| name | value |
+------+-------+
| x    | 66    |
| y    | 77    |
+------+-------+
Expressions table:
+--------------+----------+---------------+
| left_operand | operator | right_operand |
+--------------+----------+---------------+
| x            | >        | y             |
| x            | <        | y             |
| x            | =        | y             |
| y            | >        | x             |
| y            | <        | x             |
| x            | =        | x             |
+--------------+----------+---------------+
Output: 
+--------------+----------+---------------+-------+
| left_operand | operator | right_operand | value |
+--------------+----------+---------------+-------+
| x            | >        | y             | false |
| x            | <        | y             | true  |
| x            | =        | y             | false |
| y            | >        | x             | true  |
| y            | <        | x             | false |
| x            | =        | x             | true  |
+--------------+----------+---------------+-------+
Explanation: 
As shown, you need to find the value of each boolean expression in the table using the variables table.
```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col

# Create SparkSession
spark = SparkSession.builder.appName("example").getOrCreate()

# Define schema for variables DataFrame
variables_schema = StructType([
    StructField("name", StringType(), True),
    StructField("value", IntegerType(), True)
])

# Define data for variables DataFrame
variables_data = [['x', 66], ['y', 77]]

# Create DataFrame
variables_df = spark.createDataFrame(variables_data, schema=variables_schema)

# Define schema for expressions DataFrame
expressions_schema = StructType([
    StructField("left_operand", StringType(), True),
    StructField("operator", StringType(), True),
    StructField("right_operand", StringType(), True)
])

# Define data for expressions DataFrame
expressions_data = [['x', '>', 'y'], ['x', '<', 'y'], ['x', '=', 'y'], ['y', '>', 'x'], ['y', '<', 'x'], ['x', '=', 'x']]

# Create DataFrame
expressions_df = spark.createDataFrame(expressions_data, schema=expressions_schema)

# Show the DataFrames
print("Variables DataFrame:")
variables_df.show()

print("Expressions DataFrame:")
expressions_df.show()


Variables DataFrame:
+----+-----+
|name|value|
+----+-----+
|   x|   66|
|   y|   77|
+----+-----+

Expressions DataFrame:
+------------+--------+-------------+
|left_operand|operator|right_operand|
+------------+--------+-------------+
|           x|       >|            y|
|           x|       <|            y|
|           x|       =|            y|
|           y|       >|            x|
|           y|       <|            x|
|           x|       =|            x|
+------------+--------+-------------+



In [0]:
expressions_df.alias('E').join(variables_df.alias('V1'),col('E.left_operand') == col('V1.name'),'inner')\
        .join(variables_df.alias('V2'),col('E.right_operand') == col('V2.name'),'inner')\
            .withColumn('fValue',when(col('operator') == '<',col('V1.value') < col('V2.value'))\
                .when(col('operator') == '>',col('V1.value') > col('V2.value'))\
                .when(col('operator') == '=',col('V1.value') == col('V2.value'))
                ).select(col('left_operand'),col('E.operator'),col('right_operand'),col('fValue')).show()

+------------+--------+-------------+------+
|left_operand|operator|right_operand|fValue|
+------------+--------+-------------+------+
|           y|       <|            x| false|
|           y|       >|            x|  true|
|           x|       =|            x|  true|
|           x|       =|            y| false|
|           x|       <|            y|  true|
|           x|       >|            y| false|
+------------+--------+-------------+------+



## (Premium) 1212. Team Scores in Football Tournament
```
Table: Teams

+---------------+----------+
| Column Name   | Type     |
+---------------+----------+
| team_id       | int      |
| team_name     | varchar  |
+---------------+----------+
team_id is the column with unique values of this table.
Each row of this table represents a single football team.
 

Table: Matches

+---------------+---------+
| Column Name   | Type    |
+---------------+---------+
| match_id      | int     |
| host_team     | int     |
| guest_team    | int     | 
| host_goals    | int     |
| guest_goals   | int     |
+---------------+---------+
match_id is the column of unique values of this table.
Each row is a record of a finished match between two different teams. 
Teams host_team and guest_team are represented by their IDs in the Teams table (team_id), and they scored host_goals and guest_goals goals, respectively.
 

You would like to compute the scores of all teams after all matches. Points are awarded as follows:
A team receives three points if they win a match (i.e., Scored more goals than the opponent team).
A team receives one point if they draw a match (i.e., Scored the same number of goals as the opponent team).
A team receives no points if they lose a match (i.e., Scored fewer goals than the opponent team).
Write a solution that selects the team_id, team_name and num_points of each team in the tournament after all described matches.

Return the result table ordered by num_points in decreasing order. In case of a tie, order the records by team_id in increasing order.

The result format is in the following example.

 

Example 1:

Input: 
Teams table:
+-----------+--------------+
| team_id   | team_name    |
+-----------+--------------+
| 10        | Leetcode FC  |
| 20        | NewYork FC   |
| 30        | Atlanta FC   |
| 40        | Chicago FC   |
| 50        | Toronto FC   |
+-----------+--------------+
Matches table:
+------------+--------------+---------------+-------------+--------------+
| match_id   | host_team    | guest_team    | host_goals  | guest_goals  |
+------------+--------------+---------------+-------------+--------------+
| 1          | 10           | 20            | 3           | 0            |
| 2          | 30           | 10            | 2           | 2            |
| 3          | 10           | 50            | 5           | 1            |
| 4          | 20           | 30            | 1           | 0            |
| 5          | 50           | 30            | 1           | 0            |
+------------+--------------+---------------+-------------+--------------+
Output: 
+------------+--------------+---------------+
| team_id    | team_name    | num_points    |
+------------+--------------+---------------+
| 10         | Leetcode FC  | 7             |
| 20         | NewYork FC   | 3             |
| 50         | Toronto FC   | 3             |
| 30         | Atlanta FC   | 1             |
| 40         | Chicago FC   | 0             |
+------------+--------------+---------------+
```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Create Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

# Define schema for teams
teams_schema = StructType([
    StructField('team_id', IntegerType(), True),
    StructField('team_name', StringType(), True)
])

# Create DataFrame for teams
teams_data = [
    (10, 'Leetcode FC'),
    (20, 'NewYork FC'),
    (30, 'Atlanta FC'),
    (40, 'Chicago FC'),
    (50, 'Toronto FC')
]
teams_df = spark.createDataFrame(teams_data, schema=teams_schema)

# Define schema for matches
matches_schema = StructType([
    StructField('match_id', IntegerType(), True),
    StructField('host_team', IntegerType(), True),
    StructField('guest_team', IntegerType(), True),
    StructField('host_goals', IntegerType(), True),
    StructField('guest_goals', IntegerType(), True)
])

# Create DataFrame for matches
matches_data = [
    (1, 10, 20, 3, 0),
    (2, 30, 10, 2, 2),
    (3, 10, 50, 5, 1),
    (4, 20, 30, 1, 0),
    (5, 50, 30, 1, 0)
]
matches_df = spark.createDataFrame(matches_data, schema=matches_schema)

# Show the DataFrames
teams_df.show()
matches_df.show()


+-------+-----------+
|team_id|  team_name|
+-------+-----------+
|     10|Leetcode FC|
|     20| NewYork FC|
|     30| Atlanta FC|
|     40| Chicago FC|
|     50| Toronto FC|
+-------+-----------+

+--------+---------+----------+----------+-----------+
|match_id|host_team|guest_team|host_goals|guest_goals|
+--------+---------+----------+----------+-----------+
|       1|       10|        20|         3|          0|
|       2|       30|        10|         2|          2|
|       3|       10|        50|         5|          1|
|       4|       20|        30|         1|          0|
|       5|       50|        30|         1|          0|
+--------+---------+----------+----------+-----------+



In [0]:
df1 = teams_df.alias('T').join(matches_df.alias('M'), col('T.team_id') == col('M.host_team'), 'left')\
    .fillna(0)\
    .withColumn('num_points', when(col('host_goals') > col('guest_goals'), 3)
                              .when(col('host_goals') < col('guest_goals'), 0)
                              .when(col('host_goals') == col('guest_goals'), 1)
                              .otherwise(0))\
    .select(col('host_team').alias('team_id'), col('T.team_name'), col('num_points'))

# Compute points for guest teams
df2 = teams_df.alias('T').join(matches_df.alias('M'), col('T.team_id') == col('M.guest_team'), 'left')\
    .fillna(0)\
    .withColumn('num_points', when(col('guest_goals') > col('host_goals'), 3)
                              .when(col('guest_goals') < col('host_goals'), 0)
                              .when(col('guest_goals') == col('host_goals'), 1)
                              .otherwise(0))\
    .select(col('guest_team').alias('team_id'), col('T.team_name'), col('num_points'))

# Combine host and guest results
df3 = df1.union(df2)

# Aggregate points by team_id and team_name
result_df = df3.groupBy(col('team_id'), col('team_name'))\
    .agg(coalesce(sum(col('num_points')), lit(0)).alias('num_points'))\
    .orderBy(col('num_points').desc(), col('team_id'))

# Show the result
result_df.show()

+-------+-----------+----------+
|team_id|  team_name|num_points|
+-------+-----------+----------+
|     10|Leetcode FC|         7|
|     20| NewYork FC|         3|
|     50| Toronto FC|         3|
|      0| Chicago FC|         2|
|     30| Atlanta FC|         1|
+-------+-----------+----------+



# Basic Aggregate Functions

## 1890. The Latest Login in 2020
```
Table: Logins

+----------------+----------+
| Column Name    | Type     |
+----------------+----------+
| user_id        | int      |
| time_stamp     | datetime |
+----------------+----------+
(user_id, time_stamp) is the primary key (combination of columns with unique values) for this table.
Each row contains information about the login time for the user with ID user_id.
 

Write a solution to report the latest login for all users in the year 2020. Do not include the users who did not login in 2020.

Return the result table in any order.

The result format is in the following example.

 

Example 1:

Input: 
Logins table:
+---------+---------------------+
| user_id | time_stamp          |
+---------+---------------------+
| 6       | 2020-06-30 15:06:07 |
| 6       | 2021-04-21 14:06:06 |
| 6       | 2019-03-07 00:18:15 |
| 8       | 2020-02-01 05:10:53 |
| 8       | 2020-12-30 00:46:50 |
| 2       | 2020-01-16 02:49:50 |
| 2       | 2019-08-25 07:59:08 |
| 14      | 2019-07-14 09:00:00 |
| 14      | 2021-01-06 11:59:59 |
+---------+---------------------+
Output: 
+---------+---------------------+
| user_id | last_stamp          |
+---------+---------------------+
| 6       | 2020-06-30 15:06:07 |
| 8       | 2020-12-30 00:46:50 |
| 2       | 2020-01-16 02:49:50 |
+---------+---------------------+
Explanation: 
User 6 logged into their account 3 times but only once in 2020, so we include this login in the result table.
User 8 logged into their account 2 times in 2020, once in February and once in December. We include only the latest one (December) in the result table.
User 2 logged into their account 2 times but only once in 2020, so we include this login in the result table.
User 14 did not login in 2020, so we do not include them in the result table.
```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType
from datetime import datetime

# Create Spark session
spark = SparkSession.builder.appName("LoginsExample").getOrCreate()

# Define the schema for the DataFrame
schema = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("time_stamp", TimestampType(), True)
])

# Create the data
data = [
    (6, datetime.strptime('2020-06-30 15:06:07', '%Y-%m-%d %H:%M:%S')),
    (6, datetime.strptime('2021-04-21 14:06:06', '%Y-%m-%d %H:%M:%S')),
    (6, datetime.strptime('2019-03-07 00:18:15', '%Y-%m-%d %H:%M:%S')),
    (8, datetime.strptime('2020-02-01 05:10:53', '%Y-%m-%d %H:%M:%S')),
    (8, datetime.strptime('2020-12-30 00:46:50', '%Y-%m-%d %H:%M:%S')),
    (2, datetime.strptime('2020-01-16 02:49:50', '%Y-%m-%d %H:%M:%S')),
    (2, datetime.strptime('2019-08-25 07:59:08', '%Y-%m-%d %H:%M:%S')),
    (14, datetime.strptime('2019-07-14 09:00:00', '%Y-%m-%d %H:%M:%S')),
    (14, datetime.strptime('2021-01-06 11:59:59', '%Y-%m-%d %H:%M:%S'))
]

# Create the DataFrame
logins_df = spark.createDataFrame(data, schema)

# Show the DataFrame
logins_df.show()


+-------+-------------------+
|user_id|         time_stamp|
+-------+-------------------+
|      6|2020-06-30 15:06:07|
|      6|2021-04-21 14:06:06|
|      6|2019-03-07 00:18:15|
|      8|2020-02-01 05:10:53|
|      8|2020-12-30 00:46:50|
|      2|2020-01-16 02:49:50|
|      2|2019-08-25 07:59:08|
|     14|2019-07-14 09:00:00|
|     14|2021-01-06 11:59:59|
+-------+-------------------+



In [0]:
# Write a solution to report the latest login for all users in the year 2020. Do not include the users who did not login in 2020.

# Return the result table in any order.

# The result format is in the following example.


# Initial thoughts:
#     Filter 2020 data and apply window function row_number partition by user_id  time_stamp order by desc
#     and select where row_number is 1
from pyspark.sql.functions import *
from pyspark.sql.window import Window
windowOptions = Window.partitionBy(col('user_id')).orderBy(col('time_stamp').desc())
logins_df.filter(year(col('time_stamp')) == 2020).withColumn('RN',row_number().over(windowOptions))\
    .filter(col('RN') == 1).drop(col('RN')).show()

+-------+-------------------+
|user_id|         time_stamp|
+-------+-------------------+
|      2|2020-01-16 02:49:50|
|      6|2020-06-30 15:06:07|
|      8|2020-12-30 00:46:50|
+-------+-------------------+



## 511. Game Play Analysis I
```
Table: Activity

+--------------+---------+
| Column Name  | Type    |
+--------------+---------+
| player_id    | int     |
| device_id    | int     |
| event_date   | date    |
| games_played | int     |
+--------------+---------+
(player_id, event_date) is the primary key (combination of columns with unique values) of this table.
This table shows the activity of players of some games.
Each row is a record of a player who logged in and played a number of games (possibly 0) before logging out on someday using some device.
 

Write a solution to find the first login date for each player.

Return the result table in any order.

The result format is in the following example.

 

Example 1:

Input: 
Activity table:
+-----------+-----------+------------+--------------+
| player_id | device_id | event_date | games_played |
+-----------+-----------+------------+--------------+
| 1         | 2         | 2016-03-01 | 5            |
| 1         | 2         | 2016-05-02 | 6            |
| 2         | 3         | 2017-06-25 | 1            |
| 3         | 1         | 2016-03-02 | 0            |
| 3         | 4         | 2018-07-03 | 5            |
+-----------+-----------+------------+--------------+
Output: 
+-----------+-------------+
| player_id | first_login |
+-----------+-------------+
| 1         | 2016-03-01  |
| 2         | 2017-06-25  |
| 3         | 2016-03-02  |
+-----------+-------------+
```

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType
from pyspark.sql import SparkSession

# Define schema
schema = StructType([
    StructField("player_id", IntegerType(), True),
    StructField("device_id", IntegerType(), True),
    StructField("event_date", StringType(), True),
    StructField("games_played", IntegerType(), True)
])

# Create data
data = [
    (1, 2, '2016-03-01', 5),
    (1, 2, '2016-05-02', 6),
    (2, 3, '2017-06-25', 1),
    (3, 1, '2016-03-02', 0),
    (3, 4, '2018-07-03', 5)
]

# Create PySpark DataFrame
activity_df = spark.createDataFrame(data, schema=schema)

# Show the DataFrame
activity_df.show()


+---------+---------+----------+------------+
|player_id|device_id|event_date|games_played|
+---------+---------+----------+------------+
|        1|        2|2016-03-01|           5|
|        1|        2|2016-05-02|           6|
|        2|        3|2017-06-25|           1|
|        3|        1|2016-03-02|           0|
|        3|        4|2018-07-03|           5|
+---------+---------+----------+------------+



In [0]:
# Write a solution to find the first login date for each player.

# Return the result table in any order.

# Initial thoughts:
# 1.Use window function rowNumber and order by event_date asc as first login
# 2.filter based on rowNumber 1

windowOptions = Window.partitionBy(col('player_id')).orderBy(col('event_date'))
activity_df.withColumn('RN', row_number().over(windowOptions)).filter(col('RN') == 1)\
    .select(col('player_id'),col('event_date').alias('firstLogin')).show()

+---------+----------+
|player_id|firstLogin|
+---------+----------+
|        1|2016-03-01|
|        2|2017-06-25|
|        3|2016-03-02|
+---------+----------+



## (Premium) 1571. Warehouse Manager
```
Table: Warehouse

+--------------+---------+
| Column Name  | Type    |
+--------------+---------+
| name         | varchar |
| product_id   | int     |
| units        | int     |
+--------------+---------+
(name, product_id) is the primary key (combination of columns with unique values) for this table.
Each row of this table contains the information of the products in each warehouse.
 

Table: Products

+---------------+---------+
| Column Name   | Type    |
+---------------+---------+
| product_id    | int     |
| product_name  | varchar |
| Width         | int     |
| Length        | int     |
| Height        | int     |
+---------------+---------+
product_id is the primary key (column with unique values) for this table.
Each row of this table contains information about the product dimensions (Width, Lenght, and Height) in feets of each product.
 

Write a solution to report the number of cubic feet of volume the inventory occupies in each warehouse.

Return the result table in any order.

The query result format is in the following example.

 

Example 1:

Input: 
Warehouse table:
+------------+--------------+-------------+
| name       | product_id   | units       |
+------------+--------------+-------------+
| LCHouse1   | 1            | 1           |
| LCHouse1   | 2            | 10          |
| LCHouse1   | 3            | 5           |
| LCHouse2   | 1            | 2           |
| LCHouse2   | 2            | 2           |
| LCHouse3   | 4            | 1           |
+------------+--------------+-------------+
Products table:
+------------+--------------+------------+----------+-----------+
| product_id | product_name | Width      | Length   | Height    |
+------------+--------------+------------+----------+-----------+
| 1          | LC-TV        | 5          | 50       | 40        |
| 2          | LC-KeyChain  | 5          | 5        | 5         |
| 3          | LC-Phone     | 2          | 10       | 10        |
| 4          | LC-T-Shirt   | 4          | 10       | 20        |
+------------+--------------+------------+----------+-----------+
Output: 
+----------------+------------+
| warehouse_name | volume     | 
+----------------+------------+
| LCHouse1       | 12250      | 
| LCHouse2       | 20250      |
| LCHouse3       | 800        |
+----------------+------------+
Explanation: 
Volume of product_id = 1 (LC-TV), 5x50x40 = 10000
Volume of product_id = 2 (LC-KeyChain), 5x5x5 = 125 
Volume of product_id = 3 (LC-Phone), 2x10x10 = 200
Volume of product_id = 4 (LC-T-Shirt), 4x10x20 = 800
LCHouse1: 1 unit of LC-TV + 10 units of LC-KeyChain + 5 units of LC-Phone.
          Total volume: 1*10000 + 10*125  + 5*200 = 12250 cubic feet
LCHouse2: 2 units of LC-TV + 2 units of LC-KeyChain.
          Total volume: 2*10000 + 2*125 = 20250 cubic feet
LCHouse3: 1 unit of LC-T-Shirt.
          Total volume: 1*800 = 800 cubic feet.
```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Assuming spark session is already created
# spark = SparkSession.builder.getOrCreate()

# Data for warehouse
warehouse_data = [
    ('LCHouse1', 1, 1), 
    ('LCHouse1', 2, 10), 
    ('LCHouse1', 3, 5), 
    ('LCHouse2', 1, 2), 
    ('LCHouse2', 2, 2), 
    ('LCHouse3', 4, 1)
]

warehouse_schema = StructType([
    StructField('name', StringType(), True),
    StructField('product_id', IntegerType(), True),
    StructField('units', IntegerType(), True)
])

warehouse_df = spark.createDataFrame(warehouse_data, schema=warehouse_schema)

# Data for products
products_data = [
    (1, 'LC-TV', 5, 50, 40),
    (2, 'LC-KeyChain', 5, 5, 5),
    (3, 'LC-Phone', 2, 10, 10),
    (4, 'LC-T-Shirt', 4, 10, 20)
]

products_schema = StructType([
    StructField('product_id', IntegerType(), True),
    StructField('product_name', StringType(), True),
    StructField('Width', IntegerType(), True),
    StructField('Length', IntegerType(), True),
    StructField('Height', IntegerType(), True)
])

products_df = spark.createDataFrame(products_data, schema=products_schema)

# Display the DataFrames
warehouse_df.show()
products_df.show()


+--------+----------+-----+
|    name|product_id|units|
+--------+----------+-----+
|LCHouse1|         1|    1|
|LCHouse1|         2|   10|
|LCHouse1|         3|    5|
|LCHouse2|         1|    2|
|LCHouse2|         2|    2|
|LCHouse3|         4|    1|
+--------+----------+-----+

+----------+------------+-----+------+------+
|product_id|product_name|Width|Length|Height|
+----------+------------+-----+------+------+
|         1|       LC-TV|    5|    50|    40|
|         2| LC-KeyChain|    5|     5|     5|
|         3|    LC-Phone|    2|    10|    10|
|         4|  LC-T-Shirt|    4|    10|    20|
+----------+------------+-----+------+------+



In [0]:
# Write a solution to report the number of cubic feet of volume the inventory occupies in each warehouse.

# Return the result table in any order.

# 1.Here we ar trying to find the total space will be occupied by the products.
# 2.first we left join warehouse with product then multiple the width * length * height
# 4. mutiply the toal value with units in warehouse
# 5. return warehouse and its total volume. Group by warehouse and sum the final value.
# 6.warehouse, volumns as output.

warehouse_df.alias('W').join(products_df.alias('P'), col('W.product_id') == col('P.product_id'),'left')\
    .withColumn('volume',( col('Width') * col('Length') * col('Height')) * col('units'))\
        .groupBy(col('name')).agg(sum('volume').alias('volume')).show()


+--------+------+
|    name|volume|
+--------+------+
|LCHouse1| 12250|
|LCHouse2| 20250|
|LCHouse3|   800|
+--------+------+



## 586. Customer Placing the Largest Number of Orders
```
Table: Orders

+-----------------+----------+
| Column Name     | Type     |
+-----------------+----------+
| order_number    | int      |
| customer_number | int      |
+-----------------+----------+
order_number is the primary key (column with unique values) for this table.
This table contains information about the order ID and the customer ID.
 

Write a solution to find the customer_number for the customer who has placed the largest number of orders.

The test cases are generated so that exactly one customer will have placed more orders than any other customer.

The result format is in the following example.

 

Example 1:

Input: 
Orders table:
+--------------+-----------------+
| order_number | customer_number |
+--------------+-----------------+
| 1            | 1               |
| 2            | 2               |
| 3            | 3               |
| 4            | 3               |
+--------------+-----------------+
Output: 
+-----------------+
| customer_number |
+-----------------+
| 3               |
+-----------------+
Explanation: 
The customer with number 3 has two orders, which is greater than either customer 1 or 2 because each of them only has one order. 
So the result is customer_number 3.
 

Follow up: What if more than one customer has the largest number of orders, can you find all the customer_number in this case?
```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType

# Assuming spark session is already created
# spark = SparkSession.builder.getOrCreate()

# Data for orders
orders_data = [
    (1, 1),
    (2, 2),
    (3, 3),
    (4, 3)
]

orders_schema = StructType([
    StructField('order_number', IntegerType(), True),
    StructField('customer_number', IntegerType(), True)
])

orders_df = spark.createDataFrame(orders_data, schema=orders_schema)

# Display the DataFrame
orders_df.show()


+------------+---------------+
|order_number|customer_number|
+------------+---------------+
|           1|              1|
|           2|              2|
|           3|              3|
|           4|              3|
+------------+---------------+



In [0]:
# Write a solution to find the customer_number for the customer who has placed the largest number of orders.

# The test cases are generated so that exactly one customer will have placed more orders than any other customer.

# Initial thought:
# 1.group by customer order by count desc limit 1 or max count

from pyspark.sql.functions import *

orders_df.groupBy(col('customer_number')).agg(count(col('order_number')).alias('noOfOrders'))\
    .orderBy(col('noOfOrders').desc()).select(col('customer_number')).limit(1).show()

+---------------+
|customer_number|
+---------------+
|              3|
+---------------+



## 1741. Find Total Time Spent by Each Employee
```
Table: Employees

+-------------+------+
| Column Name | Type |
+-------------+------+
| emp_id      | int  |
| event_day   | date |
| in_time     | int  |
| out_time    | int  |
+-------------+------+
(emp_id, event_day, in_time) is the primary key (combinations of columns with unique values) of this table.
The table shows the employees' entries and exits in an office.
event_day is the day at which this event happened, in_time is the minute at which the employee entered the office, and out_time is the minute at which they left the office.
in_time and out_time are between 1 and 1440.
It is guaranteed that no two events on the same day intersect in time, and in_time < out_time.
 

Write a solution to calculate the total time in minutes spent by each employee on each day at the office. Note that within one day, an employee can enter and leave more than once. The time spent in the office for a single entry is out_time - in_time.

Return the result table in any order.

The result format is in the following example.

 

Example 1:

Input: 
Employees table:
+--------+------------+---------+----------+
| emp_id | event_day  | in_time | out_time |
+--------+------------+---------+----------+
| 1      | 2020-11-28 | 4       | 32       |
| 1      | 2020-11-28 | 55      | 200      |
| 1      | 2020-12-03 | 1       | 42       |
| 2      | 2020-11-28 | 3       | 33       |
| 2      | 2020-12-09 | 47      | 74       |
+--------+------------+---------+----------+
Output: 
+------------+--------+------------+
| day        | emp_id | total_time |
+------------+--------+------------+
| 2020-11-28 | 1      | 173        |
| 2020-11-28 | 2      | 30         |
| 2020-12-03 | 1      | 41         |
| 2020-12-09 | 2      | 27         |
+------------+--------+------------+
Explanation: 
Employee 1 has three events: two on day 2020-11-28 with a total of (32 - 4) + (200 - 55) = 173, and one on day 2020-12-03 with a total of (42 - 1) = 41.
Employee 2 has two events: one on day 2020-11-28 with a total of (33 - 3) = 30, and one on day 2020-12-09 with a total of (74 - 47) = 27.
```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType

# Assuming spark session is already created
# spark = SparkSession.builder.getOrCreate()

# Data for employees
employees_data = [
    (1, '2020-11-28', 4, 32),
    (1, '2020-11-28', 55, 200),
    (1, '2020-12-03', 1, 42),
    (2, '2020-11-28', 3, 33),
    (2, '2020-12-09', 47, 74)
]

employees_schema = StructType([
    StructField('emp_id', IntegerType(), True),
    StructField('event_day', StringType(), True),
    StructField('in_time', IntegerType(), True),
    StructField('out_time', IntegerType(), True)
])

employees_df = spark.createDataFrame(employees_data, schema=employees_schema)

# Display the DataFrame
employees_df.show()


+------+----------+-------+--------+
|emp_id| event_day|in_time|out_time|
+------+----------+-------+--------+
|     1|2020-11-28|      4|      32|
|     1|2020-11-28|     55|     200|
|     1|2020-12-03|      1|      42|
|     2|2020-11-28|      3|      33|
|     2|2020-12-09|     47|      74|
+------+----------+-------+--------+



In [0]:
# Write a solution to calculate the total time in minutes spent by each employee on each day at the office. Note that within one day, an employee can enter and leave more than once. The time spent in the office for a single entry is out_time - in_time.

# Return the result table in any order.

# 1. simple group by the results after finding total time result in any order.


employees_df.withColumn('total_time', col('out_time') - col('in_time'))\
    .groupBy(col('emp_id'),col('event_day')).agg(sum(col('total_time')).alias('total_time')).show()

+------+----------+----------+
|emp_id| event_day|total_time|
+------+----------+----------+
|     1|2020-11-28|       173|
|     1|2020-12-03|        41|
|     2|2020-11-28|        30|
|     2|2020-12-09|        27|
+------+----------+----------+



## (Premium) 1173. Immediate Food Delivery I
```
Table: Delivery

+-----------------------------+---------+
| Column Name                 | Type    |
+-----------------------------+---------+
| delivery_id                 | int     |
| customer_id                 | int     |
| order_date                  | date    |
| customer_pref_delivery_date | date    |
+-----------------------------+---------+
delivery_id is the primary key (column with unique values) of this table.
The table holds information about food delivery to customers that make orders at some date and specify a preferred delivery date (on the same order date or after it).
 

If the customer's preferred delivery date is the same as the order date, then the order is called immediate; otherwise, it is called scheduled.

Write a solution to find the percentage of immediate orders in the table, rounded to 2 decimal places.

The result format is in the following example.

 

Example 1:

Input: 
Delivery table:
+-------------+-------------+------------+-----------------------------+
| delivery_id | customer_id | order_date | customer_pref_delivery_date |
+-------------+-------------+------------+-----------------------------+
| 1           | 1           | 2019-08-01 | 2019-08-02                  |
| 2           | 5           | 2019-08-02 | 2019-08-02                  |
| 3           | 1           | 2019-08-11 | 2019-08-11                  |
| 4           | 3           | 2019-08-24 | 2019-08-26                  |
| 5           | 4           | 2019-08-21 | 2019-08-22                  |
| 6           | 2           | 2019-08-11 | 2019-08-13                  |
+-------------+-------------+------------+-----------------------------+
Output: 
+----------------------+
| immediate_percentage |
+----------------------+
| 33.33                |
+----------------------+
Explanation: The orders with delivery id 2 and 3 are immediate while the others are scheduled.
```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType

# Assuming spark session is already created
# spark = SparkSession.builder.getOrCreate()

# Data for delivery
delivery_data = [
    (1, 1, '2019-08-01', '2019-08-02'),
    (2, 5, '2019-08-02', '2019-08-02'),
    (3, 1, '2019-08-11', '2019-08-11'),
    (4, 3, '2019-08-24', '2019-08-26'),
    (5, 4, '2019-08-21', '2019-08-22'),
    (6, 2, '2019-08-11', '2019-08-13')
]

delivery_schema = StructType([
    StructField('delivery_id', IntegerType(), True),
    StructField('customer_id', IntegerType(), True),
    StructField('order_date', StringType(), True),
    StructField('customer_pref_delivery_date', StringType(), True)
])

delivery_df = spark.createDataFrame(delivery_data, schema=delivery_schema)

# Display the DataFrame
delivery_df.show()


+-----------+-----------+----------+---------------------------+
|delivery_id|customer_id|order_date|customer_pref_delivery_date|
+-----------+-----------+----------+---------------------------+
|          1|          1|2019-08-01|                 2019-08-02|
|          2|          5|2019-08-02|                 2019-08-02|
|          3|          1|2019-08-11|                 2019-08-11|
|          4|          3|2019-08-24|                 2019-08-26|
|          5|          4|2019-08-21|                 2019-08-22|
|          6|          2|2019-08-11|                 2019-08-13|
+-----------+-----------+----------+---------------------------+



In [0]:
# If the customer's preferred delivery date is the same as the order date, then the order is called immediate; otherwise, it is called scheduled.

# Write a solution to find the percentage of immediate orders in the table, rounded to 2 decimal places.

# 1.withColumnn Use case when order_date == customre_delivery_prep_date then 1 else 0. then apply avg function and round it to 2

delivery_df.withColumn('immediteOrders', when(col('order_date') == col('customer_pref_delivery_date'),1)\
    .otherwise(0)).agg(round(avg(col('immediteOrders'))*100,2).alias('immediate_percentage')).show()



+--------------------+
|immediate_percentage|
+--------------------+
|               33.33|
+--------------------+




## (Premium) 1445. Apples & Oranges

```
Table: Sales

+---------------+---------+
| Column Name   | Type    |
+---------------+---------+
| sale_date     | date    |
| fruit         | enum    | 
| sold_num      | int     | 
+---------------+---------+
(sale_date, fruit) is the primary key (combination of columns with unique values) of this table.
This table contains the sales of "apples" and "oranges" sold each day.
 

Write a solution to report the difference between the number of apples and oranges sold each day.

Return the result table ordered by sale_date.

The result format is in the following example.

 

Example 1:

Input: 
Sales table:
+------------+------------+-------------+
| sale_date  | fruit      | sold_num    |
+------------+------------+-------------+
| 2020-05-01 | apples     | 10          |
| 2020-05-01 | oranges    | 8           |
| 2020-05-02 | apples     | 15          |
| 2020-05-02 | oranges    | 15          |
| 2020-05-03 | apples     | 20          |
| 2020-05-03 | oranges    | 0           |
| 2020-05-04 | apples     | 15          |
| 2020-05-04 | oranges    | 16          |
+------------+------------+-------------+
Output: 
+------------+--------------+
| sale_date  | diff         |
+------------+--------------+
| 2020-05-01 | 2            |
| 2020-05-02 | 0            |
| 2020-05-03 | 20           |
| 2020-05-04 | -1           |
+------------+--------------+
Explanation: 
Day 2020-05-01, 10 apples and 8 oranges were sold (Difference  10 - 8 = 2).
Day 2020-05-02, 15 apples and 15 oranges were sold (Difference 15 - 15 = 0).
Day 2020-05-03, 20 apples and 0 oranges were sold (Difference 20 - 0 = 20).
Day 2020-05-04, 15 apples and 16 oranges were sold (Difference 15 - 16 = -1).

```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

# Assuming spark session is already created
# spark = SparkSession.builder.getOrCreate()

# Data for sales
sales_data = [
    ('2020-05-01', 'apples', 10),
    ('2020-05-01', 'oranges', 8),
    ('2020-05-02', 'apples', 15),
    ('2020-05-02', 'oranges', 15),
    ('2020-05-03', 'apples', 20),
    ('2020-05-03', 'oranges', 0),
    ('2020-05-04', 'apples', 15),
    ('2020-05-04', 'oranges', 16)
]

sales_schema = StructType([
    StructField('sale_date', StringType(), True),
    StructField('fruit', StringType(), True),
    StructField('sold_num', IntegerType(), True)
])

sales_df = spark.createDataFrame(sales_data, schema=sales_schema)

# Display the DataFrame
sales_df.show()


+----------+-------+--------+
| sale_date|  fruit|sold_num|
+----------+-------+--------+
|2020-05-01| apples|      10|
|2020-05-01|oranges|       8|
|2020-05-02| apples|      15|
|2020-05-02|oranges|      15|
|2020-05-03| apples|      20|
|2020-05-03|oranges|       0|
|2020-05-04| apples|      15|
|2020-05-04|oranges|      16|
+----------+-------+--------+



In [0]:
# Write a solution to report the difference between the number of apples and oranges sold each day.
# Return the result table ordered by sale_date.

# 1. first find the differnce by apples - oranges
# 2. Group by sale_date and apply sum on the diff.

sales_df.withColumn('applesDiff', when(col('fruit') == 'apples', col('sold_num')).otherwise(0))\
    .withColumn('orangeDiff', when(col('fruit') == 'oranges', col('sold_num')).otherwise(0)).groupBy(col('sale_date'))\
        .agg((sum(col('applesDiff')) - sum(col('orangeDiff'))).alias('diff')).show()

+----------+----+
| sale_date|diff|
+----------+----+
|2020-05-01|   2|
|2020-05-02|   0|
|2020-05-03|  20|
|2020-05-04|  -1|
+----------+----+



## (Premium)1699. Number of Calls Between Two Persons
```
Table: Calls

+-------------+---------+
| Column Name | Type    |
+-------------+---------+
| from_id     | int     |
| to_id       | int     |
| duration    | int     |
+-------------+---------+
This table does not have a primary key (column with unique values), it may contain duplicates.
This table contains the duration of a phone call between from_id and to_id.
from_id != to_id
 

Write a solution to report the number of calls and the total call duration between each pair of distinct persons (person1, person2) where person1 < person2.

Return the result table in any order.

The result format is in the following example.

 

Example 1:

Input: 
Calls table:
+---------+-------+----------+
| from_id | to_id | duration |
+---------+-------+----------+
| 1       | 2     | 59       |
| 2       | 1     | 11       |
| 1       | 3     | 20       |
| 3       | 4     | 100      |
| 3       | 4     | 200      |
| 3       | 4     | 200      |
| 4       | 3     | 499      |
+---------+-------+----------+
Output: 
+---------+---------+------------+----------------+
| person1 | person2 | call_count | total_duration |
+---------+---------+------------+----------------+
| 1       | 2       | 2          | 70             |
| 1       | 3       | 1          | 20             |
| 3       | 4       | 4          | 999            |
+---------+---------+------------+----------------+
Explanation: 
Users 1 and 2 had 2 calls and the total duration is 70 (59 + 11).
Users 1 and 3 had 1 call and the total duration is 20.
Users 3 and 4 had 4 calls and the total duration is 999 (100 + 200 + 200 + 499).
```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType

# Assuming Spark session is already created
# spark = SparkSession.builder.getOrCreate()

# Define schema
schema = StructType([
    StructField('from_id', IntegerType(), True),
    StructField('to_id', IntegerType(), True),
    StructField('duration', IntegerType(), True)
])

# Sample data
data = [
    (1, 2, 59),
    (2, 1, 11),
    (1, 3, 20),
    (3, 4, 100),
    (3, 4, 200),
    (3, 4, 200),
    (4, 3, 499)
]

# Create DataFrame
calls_df = spark.createDataFrame(data, schema=schema)

# Show the DataFrame
calls_df.show()


+-------+-----+--------+
|from_id|to_id|duration|
+-------+-----+--------+
|      1|    2|      59|
|      2|    1|      11|
|      1|    3|      20|
|      3|    4|     100|
|      3|    4|     200|
|      3|    4|     200|
|      4|    3|     499|
+-------+-----+--------+



In [0]:
# Write a solution to report the number of calls and the total call duration between each pair of distinct persons (person1, person2) where person1 < person2.

# Return the result table in any order.

# 1. Here we want to find between each pair it means the call direction is from both sides. we do union by first select from_id to to_id the next union to_id to from_id this way we can make the columns as single source and group the data.
# 2. Then we can get count and sum the duration.

df1 = calls_df.select(col('from_id'),col('to_id'),col('duration')).filter(col('from_id') < col('to_id'))
df2 = calls_df.select(col('to_id'),col('from_id'),col('duration')).filter(col('from_id') > col('to_id'))
df1.union(df2).groupBy(col('from_id'),col('to_id'))\
    .agg(count(col('duration')).alias('call_count'), sum(col('duration')).alias('total_duration')).show()

+-------+-----+----------+--------------+
|from_id|to_id|call_count|total_duration|
+-------+-----+----------+--------------+
|      1|    2|         2|            70|
|      1|    3|         1|            20|
|      3|    4|         4|           999|
+-------+-----+----------+--------------+



# Sorting and Grouping

## 1587. Bank Account Summary II 
```
Table: Users

+--------------+---------+
| Column Name  | Type    |
+--------------+---------+
| account      | int     |
| name         | varchar |
+--------------+---------+
account is the primary key (column with unique values) for this table.
Each row of this table contains the account number of each user in the bank.
There will be no two users having the same name in the table.
 

Table: Transactions

+---------------+---------+
| Column Name   | Type    |
+---------------+---------+
| trans_id      | int     |
| account       | int     |
| amount        | int     |
| transacted_on | date    |
+---------------+---------+
trans_id is the primary key (column with unique values) for this table.
Each row of this table contains all changes made to all accounts.
amount is positive if the user received money and negative if they transferred money.
All accounts start with a balance of 0.
 

Write a solution to report the name and balance of users with a balance higher than 10000. The balance of an account is equal to the sum of the amounts of all transactions involving that account.

Return the result table in any order.

The result format is in the following example.

 

Example 1:

Input: 
Users table:
+------------+--------------+
| account    | name         |
+------------+--------------+
| 900001     | Alice        |
| 900002     | Bob          |
| 900003     | Charlie      |
+------------+--------------+
Transactions table:
+------------+------------+------------+---------------+
| trans_id   | account    | amount     | transacted_on |
+------------+------------+------------+---------------+
| 1          | 900001     | 7000       |  2020-08-01   |
| 2          | 900001     | 7000       |  2020-09-01   |
| 3          | 900001     | -3000      |  2020-09-02   |
| 4          | 900002     | 1000       |  2020-09-12   |
| 5          | 900003     | 6000       |  2020-08-07   |
| 6          | 900003     | 6000       |  2020-09-07   |
| 7          | 900003     | -4000      |  2020-09-11   |
+------------+------------+------------+---------------+
Output: 
+------------+------------+
| name       | balance    |
+------------+------------+
| Alice      | 11000      |
+------------+------------+
Explanation: 
Alice's balance is (7000 + 7000 - 3000) = 11000.
Bob's balance is 1000.
Charlie's balance is (6000 + 6000 - 4000) = 8000.

```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

# Assuming Spark session is already created
# spark = SparkSession.builder.getOrCreate()

# Define schema for the 'users' DataFrame
users_schema = StructType([
    StructField('account', IntegerType(), True),
    StructField('name', StringType(), True)
])

# Define schema for the 'transactions' DataFrame
transactions_schema = StructType([
    StructField('trans_id', IntegerType(), True),
    StructField('account', IntegerType(), True),
    StructField('amount', IntegerType(), True),
    StructField('transacted_on', StringType(), True)
])

# Sample data for 'users'
users_data = [
    (900001, 'Alice'),
    (900002, 'Bob'),
    (900003, 'Charlie')
]

# Sample data for 'transactions'
transactions_data = [
    (1, 900001, 7000, '2020-08-01'),
    (2, 900001, 7000, '2020-09-01'),
    (3, 900001, -3000, '2020-09-02'),
    (4, 900002, 1000, '2020-09-12'),
    (5, 900003, 6000, '2020-08-07'),
    (6, 900003, 6000, '2020-09-07'),
    (7, 900003, -4000, '2020-09-11')
]

# Create DataFrames
users_df = spark.createDataFrame(users_data, schema=users_schema)
transactions_df = spark.createDataFrame(transactions_data, schema=transactions_schema)

# Show the DataFrames
users_df.show()
transactions_df.show()


+-------+-------+
|account|   name|
+-------+-------+
| 900001|  Alice|
| 900002|    Bob|
| 900003|Charlie|
+-------+-------+

+--------+-------+------+-------------+
|trans_id|account|amount|transacted_on|
+--------+-------+------+-------------+
|       1| 900001|  7000|   2020-08-01|
|       2| 900001|  7000|   2020-09-01|
|       3| 900001| -3000|   2020-09-02|
|       4| 900002|  1000|   2020-09-12|
|       5| 900003|  6000|   2020-08-07|
|       6| 900003|  6000|   2020-09-07|
|       7| 900003| -4000|   2020-09-11|
+--------+-------+------+-------------+



In [0]:
# Write a solution to report the name and balance of users with a balance higher than 10000. The balance of an account is equal to the sum of the amounts of all transactions involving that account.

# Return the result table in any order.

# 1. Group by account and sum the amount and filter the amount > 1000
#Tip while passing a negative value into sum function it will automatically minus the amount.
transactions_df.alias('T').join(users_df.alias('U'),col('T.account') == col('U.account'),'left')\
    .groupBy(col('name')).agg(sum(col('amount')).alias('totalAmount')).filter(col('totalAmount') > 10000).show()

+-----+-----------+
| name|totalAmount|
+-----+-----------+
|Alice|      11000|
+-----+-----------+



## 182. Duplicate Emails
### Leve : Easy

```
Table: Person

+-------------+---------+
| Column Name | Type    |
+-------------+---------+
| id          | int     |
| email       | varchar |
+-------------+---------+
id is the primary key (column with unique values) for this table.
Each row of this table contains an email. The emails will not contain uppercase letters.
 

Write a solution to report all the duplicate emails. Note that it's guaranteed that the email field is not NULL.

Return the result table in any order.

The result format is in the following example.

 

Example 1:

Input: 
Person table:
+----+---------+
| id | email   |
+----+---------+
| 1  | a@b.com |
| 2  | c@d.com |
| 3  | a@b.com |
+----+---------+
Output: 
+---------+
| Email   |
+---------+
| a@b.com |
+---------+
Explanation: a@b.com is repeated two times.

```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create SparkSession
spark = SparkSession.builder.appName("DataConversion").getOrCreate()

# Sample data
data = [(1, 'a@b.com'), (2, 'c@d.com'), (3, 'a@b.com')]
columns = ['id', 'email']

# Create PySpark DataFrame
person_df = spark.createDataFrame(data, columns)
person_df.show()

+---+-------+
| id|  email|
+---+-------+
|  1|a@b.com|
|  2|c@d.com|
|  3|a@b.com|
+---+-------+



In [0]:
person_df.groupBy(col('email')).agg(count('*').alias('cnt')).filter(col('cnt') > 1 ).select(col('email')).show()

+-------+
|  email|
+-------+
|a@b.com|
+-------+



## 1050. Actors and Directors Who Cooperated At Least Three Times
### Easy

```

Table: ActorDirector

+-------------+---------+
| Column Name | Type    |
+-------------+---------+
| actor_id    | int     |
| director_id | int     |
| timestamp   | int     |
+-------------+---------+
timestamp is the primary key (column with unique values) for this table.
 

Write a solution to find all the pairs (actor_id, director_id) where the actor has cooperated with the director at least three times.

Return the result table in any order.

The result format is in the following example.

 

Example 1:

Input: 
ActorDirector table:
+-------------+-------------+-------------+
| actor_id    | director_id | timestamp   |
+-------------+-------------+-------------+
| 1           | 1           | 0           |
| 1           | 1           | 1           |
| 1           | 1           | 2           |
| 1           | 2           | 3           |
| 1           | 2           | 4           |
| 2           | 1           | 5           |
| 2           | 1           | 6           |
+-------------+-------------+-------------+
Output: 
+-------------+-------------+
| actor_id    | director_id |
+-------------+-------------+
| 1           | 1           |
+-------------+-------------+
Explanation: The only pair is (1, 1) where they cooperated exactly 3 times.
```

In [0]:
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder.appName("DataConversion").getOrCreate()

# Sample data
data = [(1, 1, 0), (1, 1, 1), (1, 1, 2), (1, 2, 3), (1, 2, 4), (2, 1, 5), (2, 1, 6)]
columns = ['actor_id', 'director_id', 'timestamp']

# Create PySpark DataFrame
actor_director_df = spark.createDataFrame(data, columns)

actor_director_df.show()

+--------+-----------+---------+
|actor_id|director_id|timestamp|
+--------+-----------+---------+
|       1|          1|        0|
|       1|          1|        1|
|       1|          1|        2|
|       1|          2|        3|
|       1|          2|        4|
|       2|          1|        5|
|       2|          1|        6|
+--------+-----------+---------+



In [0]:
# Write a solution to find all the pairs (actor_id, director_id) where the actor has cooperated with the director at least three times.

# Return the result table in any order.

actor_director_df.groupBy(col('actor_id'),col('director_id')).agg(count('*').alias('Cnt')).filter(col('Cnt')>=3).drop(col('Cnt')).show()

+--------+-----------+
|actor_id|director_id|
+--------+-----------+
|       1|          1|
+--------+-----------+



## (Premium)1511. Customer Order Frequency
### Level: Easy
```
Table: Customers

+---------------+---------+
| Column Name   | Type    |
+---------------+---------+
| customer_id   | int     |
| name          | varchar |
| country       | varchar |
+---------------+---------+
customer_id is the column with unique values for this table.
This table contains information about the customers in the company.
 

Table: Product

+---------------+---------+
| Column Name   | Type    |
+---------------+---------+
| product_id    | int     |
| description   | varchar |
| price         | int     |
+---------------+---------+
product_id is the column with unique values for this table.
This table contains information on the products in the company.
price is the product cost.
 

Table: Orders

+---------------+---------+
| Column Name   | Type    |
+---------------+---------+
| order_id      | int     |
| customer_id   | int     |
| product_id    | int     |
| order_date    | date    |
| quantity      | int     |
+---------------+---------+
order_id is the column with unique values for this table.
This table contains information on customer orders.
customer_id is the id of the customer who bought "quantity" products with id "product_id".
Order_date is the date in format ('YYYY-MM-DD') when the order was shipped.
 

Write a solution to report the customer_id and customer_name of customers who have spent at least $100 in each month of June and July 2020.

Return the result table in any order.

The result format is in the following example.

 

Example 1:

Input: 
Customers table:
+--------------+-----------+-------------+
| customer_id  | name      | country     |
+--------------+-----------+-------------+
| 1            | Winston   | USA         |
| 2            | Jonathan  | Peru        |
| 3            | Moustafa  | Egypt       |
+--------------+-----------+-------------+
Product table:
+--------------+-------------+-------------+
| product_id   | description | price       |
+--------------+-------------+-------------+
| 10           | LC Phone    | 300         |
| 20           | LC T-Shirt  | 10          |
| 30           | LC Book     | 45          |
| 40           | LC Keychain | 2           |
+--------------+-------------+-------------+
Orders table:
+--------------+-------------+-------------+-------------+-----------+
| order_id     | customer_id | product_id  | order_date  | quantity  |
+--------------+-------------+-------------+-------------+-----------+
| 1            | 1           | 10          | 2020-06-10  | 1         |
| 2            | 1           | 20          | 2020-07-01  | 1         |
| 3            | 1           | 30          | 2020-07-08  | 2         |
| 4            | 2           | 10          | 2020-06-15  | 2         |
| 5            | 2           | 40          | 2020-07-01  | 10        |
| 6            | 3           | 20          | 2020-06-24  | 2         |
| 7            | 3           | 30          | 2020-06-25  | 2         |
| 9            | 3           | 30          | 2020-05-08  | 3         |
+--------------+-------------+-------------+-------------+-----------+
Output: 
+--------------+------------+
| customer_id  | name       |  
+--------------+------------+
| 1            | Winston    |
+--------------+------------+
Explanation: 
Winston spent $300 (300 * 1) in June and $100 ( 10 * 1 + 45 * 2) in July 2020.
Jonathan spent $600 (300 * 2) in June and $20 ( 2 * 10) in July 2020.
Moustafa spent $110 (10 * 2 + 45 * 2) in June and $0 in July 2020.

```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create Spark session
spark = SparkSession.builder.getOrCreate()

# Define data
data_customers = [(1, 'Winston', 'USA'), (2, 'Jonathan', 'Peru'), (3, 'Moustafa', 'Egypt')]
data_product = [(10, 'LC Phone', 300), (20, 'LC T-Shirt', 10), (30, 'LC Book', 45), (40, 'LC Keychain', 2)]
data_orders = [(1, 1, 10, '2020-06-10', 1), (2, 1, 20, '2020-07-01', 1), (3, 1, 30, '2020-07-08', 2), 
               (4, 2, 10, '2020-06-15', 2), (5, 2, 40, '2020-07-01', 10), (6, 3, 20, '2020-06-24', 2),
               (7, 3, 30, '2020-06-25', 2), (9, 3, 30, '2020-05-08', 3)]

# Create DataFrames
customers_df = spark.createDataFrame(data_customers, schema=['customer_id', 'name', 'country'])
product_df = spark.createDataFrame(data_product, schema=['product_id', 'description', 'price'])
orders_df = spark.createDataFrame(data_orders, schema=['order_id', 'customer_id', 'product_id', 'order_date', 'quantity'])

# Show DataFrames
customers_df.show()
product_df.show()
orders_df.show()


+-----------+--------+-------+
|customer_id|    name|country|
+-----------+--------+-------+
|          1| Winston|    USA|
|          2|Jonathan|   Peru|
|          3|Moustafa|  Egypt|
+-----------+--------+-------+

+----------+-----------+-----+
|product_id|description|price|
+----------+-----------+-----+
|        10|   LC Phone|  300|
|        20| LC T-Shirt|   10|
|        30|    LC Book|   45|
|        40|LC Keychain|    2|
+----------+-----------+-----+

+--------+-----------+----------+----------+--------+
|order_id|customer_id|product_id|order_date|quantity|
+--------+-----------+----------+----------+--------+
|       1|          1|        10|2020-06-10|       1|
|       2|          1|        20|2020-07-01|       1|
|       3|          1|        30|2020-07-08|       2|
|       4|          2|        10|2020-06-15|       2|
|       5|          2|        40|2020-07-01|      10|
|       6|          3|        20|2020-06-24|       2|
|       7|          3|        30|2020-06-25|   

In [0]:
# Write a solution to report the customer_id and customer_name of customers who have spent at least $100 in each month of June and July 2020.
from pyspark.sql.functions import *
orders_df.alias('O').filter((col('O.order_date').like('2020-06%') | col('O.order_date').like('2020-07%')))\
    .join(product_df.alias('P'), col('O.product_id') == col('P.product_id'), 'left').join(customers_df.alias('C'), col('C.customer_id') == col('O.customer_id'))\
        .groupBy(col('O.customer_id'), col('C.name'),month(col('order_date'))).agg(sum(col('quantity') * col('price')).alias('total_price')).filter(col('total_price') >= 100)\
            .groupBy(col('customer_id'),col('C.name')).agg(count('*').alias('cnt')).filter(col('cnt')>1).drop(col('cnt')).show()

+-----------+-------+
|customer_id|   name|
+-----------+-------+
|          1|Winston|
+-----------+-------+




## 1693. Daily Leads and Partners
### Level: Easy
```
Table: DailySales

+-------------+---------+
| Column Name | Type    |
+-------------+---------+
| date_id     | date    |
| make_name   | varchar |
| lead_id     | int     |
| partner_id  | int     |
+-------------+---------+
There is no primary key (column with unique values) for this table. It may contain duplicates.
This table contains the date and the name of the product sold and the IDs of the lead and partner it was sold to.
The name consists of only lowercase English letters.
 

For each date_id and make_name, find the number of distinct lead_id's and distinct partner_id's.

Return the result table in any order.

The result format is in the following example.

 

Example 1:

Input: 
DailySales table:
+-----------+-----------+---------+------------+
| date_id   | make_name | lead_id | partner_id |
+-----------+-----------+---------+------------+
| 2020-12-8 | toyota    | 0       | 1          |
| 2020-12-8 | toyota    | 1       | 0          |
| 2020-12-8 | toyota    | 1       | 2          |
| 2020-12-7 | toyota    | 0       | 2          |
| 2020-12-7 | toyota    | 0       | 1          |
| 2020-12-8 | honda     | 1       | 2          |
| 2020-12-8 | honda     | 2       | 1          |
| 2020-12-7 | honda     | 0       | 1          |
| 2020-12-7 | honda     | 1       | 2          |
| 2020-12-7 | honda     | 2       | 1          |
+-----------+-----------+---------+------------+
Output: 
+-----------+-----------+--------------+-----------------+
| date_id   | make_name | unique_leads | unique_partners |
+-----------+-----------+--------------+-----------------+
| 2020-12-8 | toyota    | 2            | 3               |
| 2020-12-7 | toyota    | 1            | 2               |
| 2020-12-8 | honda     | 2            | 2               |
| 2020-12-7 | honda     | 3            | 2               |
+-----------+-----------+--------------+-----------------+
Explanation: 
For 2020-12-8, toyota gets leads = [0, 1] and partners = [0, 1, 2] while honda gets leads = [1, 2] and partners = [1, 2].
For 2020-12-7, toyota gets leads = [0] and partners = [1, 2] while honda gets leads = [0, 1, 2] and partners = [1, 2].

```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder.appName("daily_sales").getOrCreate()

# Define schema
schema = StructType([
    StructField("date_id", StringType(), True),
    StructField("make_name", StringType(), True),
    StructField("lead_id", IntegerType(), True),
    StructField("partner_id", IntegerType(), True)
])

# Data
data = [
    ['2020-12-8', 'toyota', 0, 1],
    ['2020-12-8', 'toyota', 1, 0],
    ['2020-12-8', 'toyota', 1, 2],
    ['2020-12-7', 'toyota', 0, 2],
    ['2020-12-7', 'toyota', 0, 1],
    ['2020-12-8', 'honda', 1, 2],
    ['2020-12-8', 'honda', 2, 1],
    ['2020-12-7', 'honda', 0, 1],
    ['2020-12-7', 'honda', 1, 2],
    ['2020-12-7', 'honda', 2, 1]
]

# Create DataFrame
daily_sales_df = spark.createDataFrame(data, schema=schema)

# Show DataFrame
daily_sales_df.show()


+---------+---------+-------+----------+
|  date_id|make_name|lead_id|partner_id|
+---------+---------+-------+----------+
|2020-12-8|   toyota|      0|         1|
|2020-12-8|   toyota|      1|         0|
|2020-12-8|   toyota|      1|         2|
|2020-12-7|   toyota|      0|         2|
|2020-12-7|   toyota|      0|         1|
|2020-12-8|    honda|      1|         2|
|2020-12-8|    honda|      2|         1|
|2020-12-7|    honda|      0|         1|
|2020-12-7|    honda|      1|         2|
|2020-12-7|    honda|      2|         1|
+---------+---------+-------+----------+



In [0]:
# For each date_id and make_name, find the number of distinct lead_id's and distinct partner_id's.

# Return the result table in any order.


daily_sales_df.groupBy(col('date_id'),col('make_name')).agg(countDistinct(col('lead_id')).alias('unique_leads'),countDistinct(col('partner_id')).alias('unique_partners'))\
    .show()


+---------+---------+------------+---------------+
|  date_id|make_name|unique_leads|unique_partners|
+---------+---------+------------+---------------+
|2020-12-8|   toyota|           2|              3|
|2020-12-7|    honda|           3|              2|
|2020-12-7|   toyota|           1|              2|
|2020-12-8|    honda|           2|              2|
+---------+---------+------------+---------------+



## (Premium) 1495. Friendly Movies Streamed Last Month
###Level: Easy

```

Table: TVProgram

+---------------+---------+
| Column Name   | Type    |
+---------------+---------+
| program_date  | date    |
| content_id    | int     |
| channel       | varchar |
+---------------+---------+
(program_date, content_id) is the primary key (combination of columns with unique values) for this table.
This table contains information of the programs on the TV.
content_id is the id of the program in some channel on the TV.
 

Table: Content

+------------------+---------+
| Column Name      | Type    |
+------------------+---------+
| content_id       | varchar |
| title            | varchar |
| Kids_content     | enum    |
| content_type     | varchar |
+------------------+---------+
content_id is the primary key (column with unique values) for this table.
Kids_content is an ENUM (category) of types ('Y', 'N') where: 
'Y' means is content for kids otherwise 'N' is not content for kids.
content_type is the category of the content as movies, series, etc.
 

Write a solution to report the distinct titles of the kid-friendly movies streamed in June 2020.

Return the result table in any order.

The result format is in the following example.

 

Example 1:

Input: 
TVProgram table:
+--------------------+--------------+-------------+
| program_date       | content_id   | channel     |
+--------------------+--------------+-------------+
| 2020-06-10 08:00   | 1            | LC-Channel  |
| 2020-05-11 12:00   | 2            | LC-Channel  |
| 2020-05-12 12:00   | 3            | LC-Channel  |
| 2020-05-13 14:00   | 4            | Disney Ch   |
| 2020-06-18 14:00   | 4            | Disney Ch   |
| 2020-07-15 16:00   | 5            | Disney Ch   |
+--------------------+--------------+-------------+
Content table:
+------------+----------------+---------------+---------------+
| content_id | title          | Kids_content  | content_type  |
+------------+----------------+---------------+---------------+
| 1          | Leetcode Movie | N             | Movies        |
| 2          | Alg. for Kids  | Y             | Series        |
| 3          | Database Sols  | N             | Series        |
| 4          | Aladdin        | Y             | Movies        |
| 5          | Cinderella     | Y             | Movies        |
+------------+----------------+---------------+---------------+
Output: 
+--------------+
| title        |
+--------------+
| Aladdin      |
+--------------+
Explanation: 
"Leetcode Movie" is not a content for kids.
"Alg. for Kids" is not a movie.
"Database Sols" is not a movie
"Alladin" is a movie, content for kids and was streamed in June 2020.
"Cinderella" was not streamed in June 2020.
```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

# Initialize Spark session
spark = SparkSession.builder.appName("tv_program_content").getOrCreate()

# Define schema for tv_program
tv_program_schema = StructType([
    StructField("program_date", StringType(), True),
    StructField("content_id", IntegerType(), True),
    StructField("channel", StringType(), True)
])

# Data for tv_program
tv_program_data = [
    ['2020-06-10 08:00', 1, 'LC-Channel'],
    ['2020-05-11 12:00', 2, 'LC-Channel'],
    ['2020-05-12 12:00', 3, 'LC-Channel'],
    ['2020-05-13 14:00', 4, 'Disney Ch'],
    ['2020-06-18 14:00', 4, 'Disney Ch'],
    ['2020-07-15 16:00', 5, 'Disney Ch']
]

# Create DataFrame for tv_program
tv_program_df = spark.createDataFrame(tv_program_data, schema=tv_program_schema)

# Define schema for content
content_schema = StructType([
    StructField("content_id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("Kids_content", StringType(), True),
    StructField("content_type", StringType(), True)
])

# Data for content
content_data = [
    [1, 'Leetcode Movie', 'N', 'Movies'],
    [2, 'Alg. for Kids', 'Y', 'Series'],
    [3, 'Database Sols', 'N', 'Series'],
    [4, 'Aladdin', 'Y', 'Movies'],
    [5, 'Cinderella', 'Y', 'Movies']
]

# Create DataFrame for content
content_df = spark.createDataFrame(content_data, schema=content_schema)

# Show DataFrames
tv_program_df.show()
content_df.show()


+----------------+----------+----------+
|    program_date|content_id|   channel|
+----------------+----------+----------+
|2020-06-10 08:00|         1|LC-Channel|
|2020-05-11 12:00|         2|LC-Channel|
|2020-05-12 12:00|         3|LC-Channel|
|2020-05-13 14:00|         4| Disney Ch|
|2020-06-18 14:00|         4| Disney Ch|
|2020-07-15 16:00|         5| Disney Ch|
+----------------+----------+----------+

+----------+--------------+------------+------------+
|content_id|         title|Kids_content|content_type|
+----------+--------------+------------+------------+
|         1|Leetcode Movie|           N|      Movies|
|         2| Alg. for Kids|           Y|      Series|
|         3| Database Sols|           N|      Series|
|         4|       Aladdin|           Y|      Movies|
|         5|    Cinderella|           Y|      Movies|
+----------+--------------+------------+------------+



In [0]:
from pyspark.sql.functions import *
tv_program_df.alias('T').join(content_df.alias('P'), col('T.content_id') == col('P.content_id'),'left')\
    .filter((col('program_date').like('2020-06%')) & (col('kids_content') == 'Y' )).select(col('title')).show()

+-------+
|  title|
+-------+
|Aladdin|
+-------+



## (Premium) 1501. Countries You Can Safely Invest In
### Level: Medium
```
Table Person:

+----------------+---------+
| Column Name    | Type    |
+----------------+---------+
| id             | int     |
| name           | varchar |
| phone_number   | varchar |
+----------------+---------+
id is the column of unique values for this table.
Each row of this table contains the name of a person and their phone number.
Phone number will be in the form 'xxx-yyyyyyy' where xxx is the country code (3 characters) and yyyyyyy is the phone number (7 characters) where x and y are digits. Both can contain leading zeros.
 

Table Country:

+----------------+---------+
| Column Name    | Type    |
+----------------+---------+
| name           | varchar |
| country_code   | varchar |
+----------------+---------+
country_code is the column of unique values for this table.
Each row of this table contains the country name and its code. country_code will be in the form 'xxx' where x is digits.
 

Table Calls:

+-------------+------+
| Column Name | Type |
+-------------+------+
| caller_id   | int  |
| callee_id   | int  |
| duration    | int  |
+-------------+------+
This table may contain duplicate rows.
Each row of this table contains the caller id, callee id and the duration of the call in minutes. caller_id != callee_id
 

A telecommunications company wants to invest in new countries. The company intends to invest in the countries where the average call duration of the calls in this country is strictly greater than the global average call duration.

Write a solution to find the countries where this company can invest.

Return the result table in any order.

The result format is in the following example.

 

Example 1:

Input: 
Person table:
+----+----------+--------------+
| id | name     | phone_number |
+----+----------+--------------+
| 3  | Jonathan | 051-1234567  |
| 12 | Elvis    | 051-7654321  |
| 1  | Moncef   | 212-1234567  |
| 2  | Maroua   | 212-6523651  |
| 7  | Meir     | 972-1234567  |
| 9  | Rachel   | 972-0011100  |
+----+----------+--------------+
Country table:
+----------+--------------+
| name     | country_code |
+----------+--------------+
| Peru     | 051          |
| Israel   | 972          |
| Morocco  | 212          |
| Germany  | 049          |
| Ethiopia | 251          |
+----------+--------------+
Calls table:
+-----------+-----------+----------+
| caller_id | callee_id | duration |
+-----------+-----------+----------+
| 1         | 9         | 33       |
| 2         | 9         | 4        |
| 1         | 2         | 59       |
| 3         | 12        | 102      |
| 3         | 12        | 330      |
| 12        | 3         | 5        |
| 7         | 9         | 13       |
| 7         | 1         | 3        |
| 9         | 7         | 1        |
| 1         | 7         | 7        |
+-----------+-----------+----------+
Output: 
+----------+
| country  |
+----------+
| Peru     |
+----------+
Explanation: 
The average call duration for Peru is (102 + 102 + 330 + 330 + 5 + 5) / 6 = 145.666667
The average call duration for Israel is (33 + 4 + 13 + 13 + 3 + 1 + 1 + 7) / 8 = 9.37500
The average call duration for Morocco is (33 + 4 + 59 + 59 + 3 + 7) / 6 = 27.5000 
Global call duration average = (2 * (33 + 4 + 59 + 102 + 330 + 5 + 13 + 3 + 1 + 7)) / 20 = 55.70000
Since Peru is the only country where the average call duration is greater than the global average, it is the only recommended country.
```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Initialize Spark session
spark = SparkSession.builder.appName("person_country_calls").getOrCreate()

# Define schema for person
person_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("phone_number", StringType(), True)
])

# Data for person
person_data = [
    [3, 'Jonathan', '051-1234567'],
    [12, 'Elvis', '051-7654321'],
    [1, 'Moncef', '212-1234567'],
    [2, 'Maroua', '212-6523651'],
    [7, 'Meir', '972-1234567'],
    [9, 'Rachel', '972-0011100']
]

# Create DataFrame for person
person_df = spark.createDataFrame(person_data, schema=person_schema)

# Define schema for country
country_schema = StructType([
    StructField("name", StringType(), True),
    StructField("country_code", StringType(), True)
])

# Data for country
country_data = [
    ['Peru', '051'],
    ['Israel', '972'],
    ['Morocco', '212'],
    ['Germany', '049'],
    ['Ethiopia', '251']
]

# Create DataFrame for country
country_df = spark.createDataFrame(country_data, schema=country_schema)

# Define schema for calls
calls_schema = StructType([
    StructField("caller_id", IntegerType(), True),
    StructField("callee_id", IntegerType(), True),
    StructField("duration", IntegerType(), True)
])

# Data for calls
calls_data = [
    [1, 9, 33],
    [2, 9, 4],
    [1, 2, 59],
    [3, 12, 102],
    [3, 12, 330],
    [12, 3, 5],
    [7, 9, 13],
    [7, 1, 3],
    [9, 7, 1],
    [1, 7, 7]
]

# Create DataFrame for calls
calls_df = spark.createDataFrame(calls_data, schema=calls_schema)

# Show DataFrames
person_df.show()
country_df.show()
calls_df.show()


+---+--------+------------+
| id|    name|phone_number|
+---+--------+------------+
|  3|Jonathan| 051-1234567|
| 12|   Elvis| 051-7654321|
|  1|  Moncef| 212-1234567|
|  2|  Maroua| 212-6523651|
|  7|    Meir| 972-1234567|
|  9|  Rachel| 972-0011100|
+---+--------+------------+

+--------+------------+
|    name|country_code|
+--------+------------+
|    Peru|         051|
|  Israel|         972|
| Morocco|         212|
| Germany|         049|
|Ethiopia|         251|
+--------+------------+

+---------+---------+--------+
|caller_id|callee_id|duration|
+---------+---------+--------+
|        1|        9|      33|
|        2|        9|       4|
|        1|        2|      59|
|        3|       12|     102|
|        3|       12|     330|
|       12|        3|       5|
|        7|        9|      13|
|        7|        1|       3|
|        9|        7|       1|
|        1|        7|       7|
+---------+---------+--------+



In [0]:
# A telecommunications company wants to invest in new countries. The company intends to invest in the countries where the average call duration of the calls in this country is strictly greater than the global average call duration.

# Write a solution to find the countries where this company can invest.

# Return the result table in any order.



# Advanced Select and Joins

## (Premium) 603. Consecutive Available Seats
```
Table: Cinema

+-------------+------+
| Column Name | Type |
+-------------+------+
| seat_id     | int  |
| free        | bool |
+-------------+------+
seat_id is an auto-increment column for this table.
Each row of this table indicates whether the ith seat is free or not. 1 means free while 0 means occupied.
 

Find all the consecutive available seats in the cinema.

Return the result table ordered by seat_id in ascending order.

The test cases are generated so that more than two seats are consecutively available.

The result format is in the following example.

 

Example 1:

Input: 
Cinema table:
+---------+------+
| seat_id | free |
+---------+------+
| 1       | 1    |
| 2       | 0    |
| 3       | 1    |
| 4       | 1    |
| 5       | 1    |
+---------+------+
Output: 
+---------+
| seat_id |
+---------+
| 3       |
| 4       |
| 5       |
+---------+
```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType

# Initialize Spark session
spark = SparkSession.builder.appName("cinema").getOrCreate()

# Define schema for cinema
cinema_schema = StructType([
    StructField("seat_id", IntegerType(), True),
    StructField("free", IntegerType(), True)
])

# Data for cinema
cinema_data = [
    [1, 1],
    [2, 0],
    [3, 1],
    [4, 1],
    [5, 1]
]

# Create DataFrame for cinema
cinema_df = spark.createDataFrame(cinema_data, schema=cinema_schema)

# Show DataFrame
cinema_df.show()


+-------+----+
|seat_id|free|
+-------+----+
|      1|   1|
|      2|   0|
|      3|   1|
|      4|   1|
|      5|   1|
+-------+----+



## 1795. Rearrange Products Table
```
Table: Products

+-------------+---------+
| Column Name | Type    |
+-------------+---------+
| product_id  | int     |
| store1      | int     |
| store2      | int     |
| store3      | int     |
+-------------+---------+
product_id is the primary key (column with unique values) for this table.
Each row in this table indicates the product's price in 3 different stores: store1, store2, and store3.
If the product is not available in a store, the price will be null in that store's column.
 

Write a solution to rearrange the Products table so that each row has (product_id, store, price). If a product is not available in a store, do not include a row with that product_id and store combination in the result table.

Return the result table in any order.

The result format is in the following example.

 

Example 1:

Input: 
Products table:
+------------+--------+--------+--------+
| product_id | store1 | store2 | store3 |
+------------+--------+--------+--------+
| 0          | 95     | 100    | 105    |
| 1          | 70     | null   | 80     |
+------------+--------+--------+--------+
Output: 
+------------+--------+-------+
| product_id | store  | price |
+------------+--------+-------+
| 0          | store1 | 95    |
| 0          | store2 | 100   |
| 0          | store3 | 105   |
| 1          | store1 | 70    |
| 1          | store3 | 80    |
+------------+--------+-------+
Explanation: 
Product 0 is available in all three stores with prices 95, 100, and 105 respectively.
Product 1 is available in store1 with price 70 and store3 with price 80. The product is not available in store2.
```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType

# Define schema for the DataFrame
products_schema = StructType([
    StructField('product_id', IntegerType(), True),
    StructField('store1', IntegerType(), True),
    StructField('store2', IntegerType(), True),
    StructField('store3', IntegerType(), True)
])

# Sample data for 'products'
products_data = [
    (0, 95, 100, 105),
    (1, 70, None, 80)  # None represents null values
]

# Create DataFrame
products_df = spark.createDataFrame(products_data, schema=products_schema)

# Show the DataFrame
products_df.show()


+----------+------+------+------+
|product_id|store1|store2|store3|
+----------+------+------+------+
|         0|    95|   100|   105|
|         1|    70|  null|    80|
+----------+------+------+------+



In [0]:
# Write a solution to rearrange the Products table so that each row has (product_id, store, price). If a product is not available in a store, do not include a row with that product_id and store combination in the result table.

# Return the result table in any order.
#1. This need to be unpivoted
from pyspark.sql.functions import expr
unpivotExpr = "stack(3,'store1', store1, 'store2', store2, 'store3', store3) as (store, price)"
unpivotDf = products_df.select(col('product_id'),expr(unpivotExpr)).where("price is not null").show()


+----------+------+-----+
|product_id| store|price|
+----------+------+-----+
|         0|store1|   95|
|         0|store2|  100|
|         0|store3|  105|
|         1|store1|   70|
|         1|store3|   80|
+----------+------+-----+



## (Premium) 613. Shortest Distance in a Line
```
Table: Point

+-------------+------+
| Column Name | Type |
+-------------+------+
| x           | int  |
+-------------+------+
In SQL, x is the primary key column for this table.
Each row of this table indicates the position of a point on the X-axis.
 

Find the shortest distance between any two points from the Point table.

The result format is in the following example.

 

Example 1:

Input: 
Point table:
+----+
| x  |
+----+
| -1 |
| 0  |
| 2  |
+----+
Output: 
+----------+
| shortest |
+----------+
| 1        |
+----------+
Explanation: The shortest distance is between points -1 and 0 which is |(-1) - 0| = 1.
 

Follow up: How could you optimize your solution if the Point table is ordered in ascending order?
```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Sample data
data = [[-1], [0], [2]]
columns = ['x']

# Create PySpark DataFrame
point_df = spark.createDataFrame(data, columns).withColumn("x", col("x").cast("int"))

# Show the DataFrame
point_df.show()


+---+
|  x|
+---+
| -1|
|  0|
|  2|
+---+



In [0]:
from pyspark.sql.window import Window
windowOptions = Window.orderBy(col('x'))
point_df.withColumn('shortest',lag(col('x')).over(windowOptions)).select(min(col('x') - col('shortest')).alias('shortest')).show()

+--------+
|shortest|
+--------+
|       1|
+--------+



## 1965. Employees With Missing Information
```
Table: Employees

+-------------+---------+
| Column Name | Type    |
+-------------+---------+
| employee_id | int     |
| name        | varchar |
+-------------+---------+
employee_id is the column with unique values for this table.
Each row of this table indicates the name of the employee whose ID is employee_id.
 

Table: Salaries

+-------------+---------+
| Column Name | Type    |
+-------------+---------+
| employee_id | int     |
| salary      | int     |
+-------------+---------+
employee_id is the column with unique values for this table.
Each row of this table indicates the salary of the employee whose ID is employee_id.
 

Write a solution to report the IDs of all the employees with missing information. The information of an employee is missing if:

The employee's name is missing, or
The employee's salary is missing.
Return the result table ordered by employee_id in ascending order.

The result format is in the following example.

 

Example 1:

Input: 
Employees table:
+-------------+----------+
| employee_id | name     |
+-------------+----------+
| 2           | Crew     |
| 4           | Haven    |
| 5           | Kristian |
+-------------+----------+
Salaries table:
+-------------+--------+
| employee_id | salary |
+-------------+--------+
| 5           | 76071  |
| 1           | 22517  |
| 4           | 63539  |
+-------------+--------+
Output: 
+-------------+
| employee_id |
+-------------+
| 1           |
| 2           |
+-------------+
Explanation: 
Employees 1, 2, 4, and 5 are working at this company.
The name of employee 1 is missing.
The salary of employee 2 is missing.
```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Sample data for employees
employees_data = [[2, 'Crew'], [4, 'Haven'], [5, 'Kristian']]
employees_columns = ['employee_id', 'name']

# Sample data for salaries
salaries_data = [[5, 76071], [1, 22517], [4, 63539]]
salaries_columns = ['employee_id', 'salary']

# Create PySpark DataFrames
employees_df = spark.createDataFrame(employees_data, employees_columns).withColumn("employee_id", col("employee_id").cast("int"))
salaries_df = spark.createDataFrame(salaries_data, salaries_columns).withColumn("employee_id", col("employee_id").cast("int"))

# Show the DataFrames
employees_df.show()
salaries_df.show()


+-----------+--------+
|employee_id|    name|
+-----------+--------+
|          2|    Crew|
|          4|   Haven|
|          5|Kristian|
+-----------+--------+

+-----------+------+
|employee_id|salary|
+-----------+------+
|          5| 76071|
|          1| 22517|
|          4| 63539|
+-----------+------+



In [0]:
# Write a solution to report the IDs of all the employees with missing information. The information of an employee is missing if:

# The employee's name is missing, or
# The employee's salary is missing.
# Return the result table ordered by employee_id in ascending order.

from pyspark.sql.functions import *
df1 = employees_df.alias('E').join(salaries_df.alias('S'), col('E.employee_id') == col('S.employee_id'),'full')\
        .filter((col('E.name').isNull() | (col('S.salary').isNull()))).select(col('E.employee_id'))

df2 = employees_df.alias('E').join(salaries_df.alias('S'), col('E.employee_id') == col('S.employee_id'),'full')\
        .filter((col('E.name').isNull() | (col('S.salary').isNull()))).select(col('S.employee_id'))
df1.union(df2).filter(~col('employee_id').isNull()).distinct().orderBy(col('employee_id')).show()

+-----------+
|employee_id|
+-----------+
|          1|
|          2|
+-----------+



## (Premium) 1264. Page Recommendations
```
Table: Friendship

+---------------+---------+
| Column Name   | Type    |
+---------------+---------+
| user1_id      | int     |
| user2_id      | int     |
+---------------+---------+
(user1_id, user2_id) is the primary key (combination of columns with unique values) for this table.
Each row of this table indicates that there is a friendship relation between user1_id and user2_id.
 

Table: Likes

+-------------+---------+
| Column Name | Type    |
+-------------+---------+
| user_id     | int     |
| page_id     | int     |
+-------------+---------+
(user_id, page_id) is the primary key (combination of columns with unique values) for this table.
Each row of this table indicates that user_id likes page_id.
 

Write a solution to recommend pages to the user with user_id = 1 using the pages that your friends liked. It should not recommend pages you already liked.

Return result table in any order without duplicates.

The result format is in the following example.

 

Example 1:

Input: 
Friendship table:
+----------+----------+
| user1_id | user2_id |
+----------+----------+
| 1        | 2        |
| 1        | 3        |
| 1        | 4        |
| 2        | 3        |
| 2        | 4        |
| 2        | 5        |
| 6        | 1        |
+----------+----------+
Likes table:
+---------+---------+
| user_id | page_id |
+---------+---------+
| 1       | 88      |
| 2       | 23      |
| 3       | 24      |
| 4       | 56      |
| 5       | 11      |
| 6       | 33      |
| 2       | 77      |
| 3       | 77      |
| 6       | 88      |
+---------+---------+
Output: 
+------------------+
| recommended_page |
+------------------+
| 23               |
| 24               |
| 56               |
| 33               |
| 77               |
+------------------+
Explanation: 
User one is friend with users 2, 3, 4 and 6.
Suggested pages are 23 from user 2, 24 from user 3, 56 from user 3 and 33 from user 6.
Page 77 is suggested from both user 2 and user 3.
Page 88 is not suggested because user 1 already likes it.
```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType

# Assuming the Spark session is already created as `spark`

# Creating the schema for the friendship DataFrame
friendship_schema = StructType([
    StructField("user1_id", IntegerType(), True),
    StructField("user2_id", IntegerType(), True)
])

# Creating the friendship DataFrame
friendship_data = [
    (1, 2), (1, 3), (1, 4),
    (2, 3), (2, 4), (2, 5),
    (6, 1)
]
friendship_df = spark.createDataFrame(friendship_data, schema=friendship_schema)
friendship_df.show()

# Creating the schema for the likes DataFrame
likes_schema = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("page_id", IntegerType(), True)
])

# Creating the likes DataFrame
likes_data = [
    (1, 88), (2, 23), (3, 24),
    (4, 56), (5, 11), (6, 33),
    (2, 77), (3, 77), (6, 88)
]
likes_df = spark.createDataFrame(likes_data, schema=likes_schema)
likes_df.show()


+--------+--------+
|user1_id|user2_id|
+--------+--------+
|       1|       2|
|       1|       3|
|       1|       4|
|       2|       3|
|       2|       4|
|       2|       5|
|       6|       1|
+--------+--------+

+-------+-------+
|user_id|page_id|
+-------+-------+
|      1|     88|
|      2|     23|
|      3|     24|
|      4|     56|
|      5|     11|
|      6|     33|
|      2|     77|
|      3|     77|
|      6|     88|
+-------+-------+



In [0]:
from pyspark.sql.functions import *

cte_df = friendship_df.select(col("user1_id"), col("user2_id")) \
    .unionAll(friendship_df.select(col("user2_id").alias("user1_id"), col("user1_id").alias("user2_id")))

result_df = cte_df.join(likes_df, cte_df.user2_id == likes_df.user_id, how='left') \
    .select(cte_df.user2_id, likes_df.user_id, likes_df.page_id) \
    .where(cte_df.user1_id == 1).select(col('page_id'))

result_df.distinct().show()

+-------+
|page_id|
+-------+
|     23|
|     77|
|     24|
|     56|
|     88|
|     33|
+-------+



## 608. Tree Node
### Level: Medium
```
Table: Tree

+-------------+------+
| Column Name | Type |
+-------------+------+
| id          | int  |
| p_id        | int  |
+-------------+------+
id is the column with unique values for this table.
Each row of this table contains information about the id of a node and the id of its parent node in a tree.
The given structure is always a valid tree.
 

Each node in the tree can be one of three types:

"Leaf": if the node is a leaf node.
"Root": if the node is the root of the tree.
"Inner": If the node is neither a leaf node nor a root node.
Write a solution to report the type of each node in the tree.

Return the result table in any order.

The result format is in the following example.

 
```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType

# Initialize Spark session
spark = SparkSession.builder.appName("tree").getOrCreate()

# Define schema for tree
tree_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("p_id", IntegerType(), True)
])

# Data for tree
tree_data = [
    [1, None],
    [2, 1],
    [3, 1],
    [4, 2],
    [5, 2]
]

# Create DataFrame for tree
tree_df = spark.createDataFrame(tree_data, schema=tree_schema)

# Show DataFrame
tree_df.show()


+---+----+
| id|p_id|
+---+----+
|  1|null|
|  2|   1|
|  3|   1|
|  4|   2|
|  5|   2|
+---+----+



In [0]:
from pyspark.sql.functions import *
distinct_p_ids = [row['p_id'] for row in tree_df.select(col('p_id')).distinct().collect()]
tree_df.withColumn('Type', when(col('p_id').isNull(),'Root')\
                .when(col('id').isin(distinct_p_ids) & (col('p_id').isNotNull()) ,'Inner')\
                .otherwise('Leaf')).drop(col('P_id')).show()

+---+-----+
| id| Type|
+---+-----+
|  1| Root|
|  2|Inner|
|  3| Leaf|
|  4| Leaf|
|  5| Leaf|
+---+-----+



## (Premium) 534. Game Play Analysis III
### Level: Medium
```
Table: Activity

+--------------+---------+
| Column Name  | Type    |
+--------------+---------+
| player_id    | int     |
| device_id    | int     |
| event_date   | date    |
| games_played | int     |
+--------------+---------+
(player_id, event_date) is the primary key (column with unique values) of this table.
This table shows the activity of players of some games.
Each row is a record of a player who logged in and played a number of games (possibly 0) before logging out on someday using some device.
 

Write a solution to report for each player and date, how many games played so far by the player. That is, the total number of games played by the player until that date. Check the example for clarity.

Return the result table in any order.

The result format is in the following example.

 

Example 1:

Input: 
Activity table:
+-----------+-----------+------------+--------------+
| player_id | device_id | event_date | games_played |
+-----------+-----------+------------+--------------+
| 1         | 2         | 2016-03-01 | 5            |
| 1         | 2         | 2016-05-02 | 6            |
| 1         | 3         | 2017-06-25 | 1            |
| 3         | 1         | 2016-03-02 | 0            |
| 3         | 4         | 2018-07-03 | 5            |
+-----------+-----------+------------+--------------+
Output: 
+-----------+------------+---------------------+
| player_id | event_date | games_played_so_far |
+-----------+------------+---------------------+
| 1         | 2016-03-01 | 5                   |
| 1         | 2016-05-02 | 11                  |
| 1         | 2017-06-25 | 12                  |
| 3         | 2016-03-02 | 0                   |
| 3         | 2018-07-03 | 5                   |
+-----------+------------+---------------------+
Explanation: 
For the player with id 1, 5 + 6 = 11 games played by 2016-05-02, and 5 + 6 + 1 = 12 games played by 2017-06-25.
For the player with id 3, 0 + 5 = 5 games played by 2018-07-03.
Note that for each player we only care about the days when the player logged in.
```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, DateType

# Assuming you have an active Spark session
# spark = SparkSession.builder.appName("example").getOrCreate()

# Define the schema
schema = StructType([
    StructField("player_id", IntegerType(), True),
    StructField("device_id", IntegerType(), True),
    StructField("event_date", StringType(), True),
    StructField("games_played", IntegerType(), True)
])

# Data
data = [
    (1, 2, '2016-03-01', 5),
    (1, 2, '2016-05-02', 6),
    (1, 3, '2017-06-25', 1),
    (3, 1, '2016-03-02', 0),
    (3, 4, '2018-07-03', 5)
]

# Create the DataFrame
activity_df = spark.createDataFrame(data, schema=schema)

# Show the DataFrame
activity_df.show()


+---------+---------+----------+------------+
|player_id|device_id|event_date|games_played|
+---------+---------+----------+------------+
|        1|        2|2016-03-01|           5|
|        1|        2|2016-05-02|           6|
|        1|        3|2017-06-25|           1|
|        3|        1|2016-03-02|           0|
|        3|        4|2018-07-03|           5|
+---------+---------+----------+------------+



In [0]:
#Write a solution to report for each player and date, how many games played so far by the player.
#That is, the total number of games played by the player until that date. Check the example for clarity.

#Return the result table in any order.
from pyspark.sql.window import Window
from pyspark.sql.functions import *
windowOptions = Window.partitionBy(col('player_id')).orderBy(col('event_date'))
activity_df.withColumn('games_played_so_far', sum(col('games_played')).over(windowOptions)).show()

+---------+---------+----------+------------+-------------------+
|player_id|device_id|event_date|games_played|games_played_so_far|
+---------+---------+----------+------------+-------------------+
|        1|        2|2016-03-01|           5|                  5|
|        1|        2|2016-05-02|           6|                 11|
|        1|        3|2017-06-25|           1|                 12|
|        3|        1|2016-03-02|           0|                  0|
|        3|        4|2018-07-03|           5|                  5|
+---------+---------+----------+------------+-------------------+



## (Premium) 1783. Grand Slam Titles
### Level: Medium

```
Table: Players

+----------------+---------+
| Column Name    | Type    |
+----------------+---------+
| player_id      | int     |
| player_name    | varchar |
+----------------+---------+
player_id is the primary key (column with unique values) for this table.
Each row in this table contains the name and the ID of a tennis player.
 

Table: Championships

+---------------+---------+
| Column Name   | Type    |
+---------------+---------+
| year          | int     |
| Wimbledon     | int     |
| Fr_open       | int     |
| US_open       | int     |
| Au_open       | int     |
+---------------+---------+
year is the primary key (column with unique values) for this table.
Each row of this table contains the IDs of the players who won one each tennis tournament of the grand slam.
 

Write a solution to report the number of grand slam tournaments won by each player. Do not include the players who did not win any tournament.

Return the result table in any order.

The result format is in the following example.

 

Example 1:

Input: 
Players table:
+-----------+-------------+
| player_id | player_name |
+-----------+-------------+
| 1         | Nadal       |
| 2         | Federer     |
| 3         | Novak       |
+-----------+-------------+
Championships table:
+------+-----------+---------+---------+---------+
| year | Wimbledon | Fr_open | US_open | Au_open |
+------+-----------+---------+---------+---------+
| 2018 | 1         | 1       | 1       | 1       |
| 2019 | 1         | 1       | 2       | 2       |
| 2020 | 2         | 1       | 2       | 2       |
+------+-----------+---------+---------+---------+
Output: 
+-----------+-------------+-------------------+
| player_id | player_name | grand_slams_count |
+-----------+-------------+-------------------+
| 2         | Federer     | 5                 |
| 1         | Nadal       | 7                 |
+-----------+-------------+-------------------+
Explanation: 
Player 1 (Nadal) won 7 titles: Wimbledon (2018, 2019), Fr_open (2018, 2019, 2020), US_open (2018), and Au_open (2018).
Player 2 (Federer) won 5 titles: Wimbledon (2020), US_open (2019, 2020), and Au_open (2019, 2020).
Player 3 (Novak) did not win anything, we did not include them in the result table.

```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Create Spark session
spark = SparkSession.builder.getOrCreate()

# Define schema and create DataFrame for players
players_schema = StructType([
    StructField("player_id", IntegerType(), True),
    StructField("player_name", StringType(), True)
])
players_data = [(1, 'Nadal'), (2, 'Federer'), (3, 'Novak')]
players_df = spark.createDataFrame(players_data, schema=players_schema)

# Define schema and create DataFrame for championships
championships_schema = StructType([
    StructField("year", IntegerType(), True),
    StructField("Wimbledon", IntegerType(), True),
    StructField("Fr_open", IntegerType(), True),
    StructField("US_open", IntegerType(), True),
    StructField("Au_open", IntegerType(), True)
])
championships_data = [(2018, 1, 1, 1, 1), (2019, 1, 1, 2, 2), (2020, 2, 1, 2, 2)]
championships_df = spark.createDataFrame(championships_data, schema=championships_schema)
players_df.show()
championships_df.show()


+---------+-----------+
|player_id|player_name|
+---------+-----------+
|        1|      Nadal|
|        2|    Federer|
|        3|      Novak|
+---------+-----------+

+----+---------+-------+-------+-------+
|year|Wimbledon|Fr_open|US_open|Au_open|
+----+---------+-------+-------+-------+
|2018|        1|      1|      1|      1|
|2019|        1|      1|      2|      2|
|2020|        2|      1|      2|      2|
+----+---------+-------+-------+-------+



In [0]:
unpivoted_df = championships_df.selectExpr(
    "year",
    "stack(4, 'Wimbledon', Wimbledon, 'Fr_open', Fr_open, 'US_open', US_open, 'Au_open', Au_open) as (championship, wins)"
)

unpivoted_df.alias('U').join(players_df.alias('P'),col('U.wins') == col('P.player_id'),'left')\
.groupBy(col('player_id'),col('player_name')).agg(count(col('wins'))).show()


+---------+-----------+-----------+
|player_id|player_name|count(wins)|
+---------+-----------+-----------+
|        1|      Nadal|          7|
|        2|    Federer|          5|
+---------+-----------+-----------+



## (Premium) 1747. Leetflex Banned Accounts
### Level: Medium

```

Table: LogInfo

+-------------+----------+
| Column Name | Type     |
+-------------+----------+
| account_id  | int      |
| ip_address  | int      |
| login       | datetime |
| logout      | datetime |
+-------------+----------+
This table may contain duplicate rows.
The table contains information about the login and logout dates of Leetflex accounts. It also contains the IP address from which the account was logged in and out.
It is guaranteed that the logout time is after the login time.
 

Write a solution to find the account_id of the accounts that should be banned from Leetflex. An account should be banned if it was logged in at some moment from two different IP addresses.

Return the result table in any order.

The result format is in the following example.

 

Example 1:

Input: 
LogInfo table:
+------------+------------+---------------------+---------------------+
| account_id | ip_address | login               | logout              |
+------------+------------+---------------------+---------------------+
| 1          | 1          | 2021-02-01 09:00:00 | 2021-02-01 09:30:00 |
| 1          | 2          | 2021-02-01 08:00:00 | 2021-02-01 11:30:00 |
| 2          | 6          | 2021-02-01 20:30:00 | 2021-02-01 22:00:00 |
| 2          | 7          | 2021-02-02 20:30:00 | 2021-02-02 22:00:00 |
| 3          | 9          | 2021-02-01 16:00:00 | 2021-02-01 16:59:59 |
| 3          | 13         | 2021-02-01 17:00:00 | 2021-02-01 17:59:59 |
| 4          | 10         | 2021-02-01 16:00:00 | 2021-02-01 17:00:00 |
| 4          | 11         | 2021-02-01 17:00:00 | 2021-02-01 17:59:59 |
+------------+------------+---------------------+---------------------+
Output: 
+------------+
| account_id |
+------------+
| 1          |
| 4          |
+------------+
Explanation: 
Account ID 1 --> The account was active from "2021-02-01 09:00:00" to "2021-02-01 09:30:00" with two different IP addresses (1 and 2). It should be banned.
Account ID 2 --> The account was active from two different addresses (6, 7) but in two different times.
Account ID 3 --> The account was active from two different addresses (9, 13) on the same day but they do not intersect at any moment.
Account ID 4 --> The account was active from "2021-02-01 17:00:00" to "2021-02-01 17:00:00" with two different IP addresses (10 and 11). It should be banned.
```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create Spark session
spark = SparkSession.builder.getOrCreate()

# Sample data
data = [
    (1, 1, '2021-02-01 09:00:00', '2021-02-01 09:30:00'),
    (1, 2, '2021-02-01 08:00:00', '2021-02-01 11:30:00'),
    (2, 6, '2021-02-01 20:30:00', '2021-02-01 22:00:00'),
    (2, 7, '2021-02-02 20:30:00', '2021-02-02 22:00:00'),
    (3, 9, '2021-02-01 16:00:00', '2021-02-01 16:59:59'),
    (3, 13, '2021-02-01 17:00:00', '2021-02-01 17:59:59'),
    (4, 10, '2021-02-01 16:00:00', '2021-02-01 17:00:00'),
    (4, 11, '2021-02-01 17:00:00', '2021-02-01 17:59:59')
]

# Define schema
schema = ['account_id', 'ip_address', 'login', 'logout']

# Create DataFrame
log_info_df = spark.createDataFrame(data, schema=schema)
log_info_df.show()

+----------+----------+-------------------+-------------------+
|account_id|ip_address|              login|             logout|
+----------+----------+-------------------+-------------------+
|         1|         1|2021-02-01 09:00:00|2021-02-01 09:30:00|
|         1|         2|2021-02-01 08:00:00|2021-02-01 11:30:00|
|         2|         6|2021-02-01 20:30:00|2021-02-01 22:00:00|
|         2|         7|2021-02-02 20:30:00|2021-02-02 22:00:00|
|         3|         9|2021-02-01 16:00:00|2021-02-01 16:59:59|
|         3|        13|2021-02-01 17:00:00|2021-02-01 17:59:59|
|         4|        10|2021-02-01 16:00:00|2021-02-01 17:00:00|
|         4|        11|2021-02-01 17:00:00|2021-02-01 17:59:59|
+----------+----------+-------------------+-------------------+



In [0]:
#Write a solution to find the account_id of the accounts that should be banned from Leetflex.
#An account should be banned if it was logged in at some moment from two different IP addresses.

#Return the result table in any order.
from pyspark.sql.functions import col

cross_join_df = log_info_df.alias('l1').crossJoin(log_info_df.alias('l2'))

filtered_df = cross_join_df.filter(
    (col('l1.account_id') == col('l2.account_id')) &
    (col('l1.ip_address') != col('l2.ip_address')) &
    (col('l1.login') <= col('l2.logout')) &
    (col('l2.login') <= col('l1.logout'))
)

result_df = filtered_df.select(col('l1.account_id')).distinct()

result_df.show()



+----------+
|account_id|
+----------+
|         1|
|         4|
+----------+



#Subqueries

## (Premium) 1350. Students With Invalid Departments
```
Table: Departments

+---------------+---------+
| Column Name   | Type    |
+---------------+---------+
| id            | int     |
| name          | varchar |
+---------------+---------+
In SQL, id is the primary key of this table.
The table has information about the id of each department of a university.
 

Table: Students

+---------------+---------+
| Column Name   | Type    |
+---------------+---------+
| id            | int     |
| name          | varchar |
| department_id | int     |
+---------------+---------+
In SQL, id is the primary key of this table.
The table has information about the id of each student at a university and the id of the department he/she studies at.
 

Find the id and the name of all students who are enrolled in departments that no longer exist.

Return the result table in any order.

The result format is in the following example.

 

Example 1:

Input: 
Departments table:
+------+--------------------------+
| id   | name                     |
+------+--------------------------+
| 1    | Electrical Engineering   |
| 7    | Computer Engineering     |
| 13   | Bussiness Administration |
+------+--------------------------+
Students table:
+------+----------+---------------+
| id   | name     | department_id |
+------+----------+---------------+
| 23   | Alice    | 1             |
| 1    | Bob      | 7             |
| 5    | Jennifer | 13            |
| 2    | John     | 14            |
| 4    | Jasmine  | 77            |
| 3    | Steve    | 74            |
| 6    | Luis     | 1             |
| 8    | Jonathan | 7             |
| 7    | Daiana   | 33            |
| 11   | Madelynn | 1             |
+------+----------+---------------+
Output: 
+------+----------+
| id   | name     |
+------+----------+
| 2    | John     |
| 7    | Daiana   |
| 4    | Jasmine  |
| 3    | Steve    |
+------+----------+
Explanation: 
John, Daiana, Steve, and Jasmine are enrolled in departments 14, 33, 74, and 77 respectively. department 14, 33, 74, and 77 do not exist in the Departments table.
```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Define schemas
departments_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True)
])

students_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("department_id", IntegerType(), True)
])

# Create data
departments_data = [
    (1, 'Electrical Engineering'),
    (7, 'Computer Engineering'),
    (13, 'Bussiness Administration')
]

students_data = [
    (23, 'Alice', 1),
    (1, 'Bob', 7),
    (5, 'Jennifer', 13),
    (2, 'John', 14),
    (4, 'Jasmine', 77),
    (3, 'Steve', 74),
    (6, 'Luis', 1),
    (8, 'Jonathan', 7),
    (7, 'Daiana', 33),
    (11, 'Madelynn', 1)
]

# Create DataFrames
departments_df = spark.createDataFrame(departments_data, schema=departments_schema)
students_df = spark.createDataFrame(students_data, schema=students_schema)

# Show DataFrames
departments_df.show()
students_df.show()


+---+--------------------+
| id|                name|
+---+--------------------+
|  1|Electrical Engine...|
|  7|Computer Engineering|
| 13|Bussiness Adminis...|
+---+--------------------+

+---+--------+-------------+
| id|    name|department_id|
+---+--------+-------------+
| 23|   Alice|            1|
|  1|     Bob|            7|
|  5|Jennifer|           13|
|  2|    John|           14|
|  4| Jasmine|           77|
|  3|   Steve|           74|
|  6|    Luis|            1|
|  8|Jonathan|            7|
|  7|  Daiana|           33|
| 11|Madelynn|            1|
+---+--------+-------------+



In [0]:
students_df.alias('S')\
    .join(departments_df.alias('D'),col('S.department_id') == col('D.id'),'left').filter(col('D.name').isNull())\
        .select(col('S.id'),col('S.name')).show() 

+---+-------+
| id|   name|
+---+-------+
|  2|   John|
|  4|Jasmine|
|  3|  Steve|
|  7| Daiana|
+---+-------+



## (Premium) 1303. Find the Team Size
```
Table: Employee

+---------------+---------+
| Column Name   | Type    |
+---------------+---------+
| employee_id   | int     |
| team_id       | int     |
+---------------+---------+
employee_id is the primary key (column with unique values) for this table.
Each row of this table contains the ID of each employee and their respective team.
 

Write a solution to find the team size of each of the employees.

Return the result table in any order.

The result format is in the following example.

 

Example 1:

Input: 
Employee Table:
+-------------+------------+
| employee_id | team_id    |
+-------------+------------+
|     1       |     8      |
|     2       |     8      |
|     3       |     8      |
|     4       |     7      |
|     5       |     9      |
|     6       |     9      |
+-------------+------------+
Output: 
+-------------+------------+
| employee_id | team_size  |
+-------------+------------+
|     1       |     3      |
|     2       |     3      |
|     3       |     3      |
|     4       |     1      |
|     5       |     2      |
|     6       |     2      |
+-------------+------------+
Explanation: 
Employees with Id 1,2,3 are part of a team with team_id = 8.
Employee with Id 4 is part of a team with team_id = 7.
Employees with Id 5,6 are part of a team with team_id = 9.
```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType

# Assuming you have a Spark session named 'spark'
data = [[1, 8], [2, 8], [3, 8], [4, 7], [5, 9], [6, 9]]

# Define schema
schema = StructType([
    StructField('employee_id', IntegerType(), True),
    StructField('team_id', IntegerType(), True)
])

# Create DataFrame
employee_df = spark.createDataFrame(data, schema)

# Show DataFrame
employee_df.show()


+-----------+-------+
|employee_id|team_id|
+-----------+-------+
|          1|      8|
|          2|      8|
|          3|      8|
|          4|      7|
|          5|      9|
|          6|      9|
+-----------+-------+



In [0]:
from pyspark.sql.functions import *
employee_df.alias('E1').join(employee_df.alias('E2'), col('E1.team_id') == col('E2.team_id'))\
    .groupBy(col('E1.employee_id')).agg(count('*').alias('team_size')).orderBy(col('employee_id')).show()

+-----------+---------+
|employee_id|team_size|
+-----------+---------+
|          1|        3|
|          2|        3|
|          3|        3|
|          4|        1|
|          5|        2|
|          6|        2|
+-----------+---------+



## (Premium) 512. Game Play Analysis II
```
Table: Activity

+--------------+---------+
| Column Name  | Type    |
+--------------+---------+
| player_id    | int     |
| device_id    | int     |
| event_date   | date    |
| games_played | int     |
+--------------+---------+
(player_id, event_date) is the primary key (combination of columns with unique values) of this table.
This table shows the activity of players of some games.
Each row is a record of a player who logged in and played a number of games (possibly 0) before logging out on someday using some device.
 

Write a solution to report the device that is first logged in for each player.

Return the result table in any order.

The result format is in the following example.

 

Example 1:

Input: 
Activity table:
+-----------+-----------+------------+--------------+
| player_id | device_id | event_date | games_played |
+-----------+-----------+------------+--------------+
| 1         | 2         | 2016-03-01 | 5            |
| 1         | 2         | 2016-05-02 | 6            |
| 2         | 3         | 2017-06-25 | 1            |
| 3         | 1         | 2016-03-02 | 0            |
| 3         | 4         | 2018-07-03 | 5            |
+-----------+-----------+------------+--------------+
Output: 
+-----------+-----------+
| player_id | device_id |
+-----------+-----------+
| 1         | 2         |
| 2         | 3         |
| 3         | 1         |
+-----------+-----------+
```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, DateType, StringType

# Assuming you have a Spark session named 'spark'
data = [[1, 2, '2016-03-01', 5], [1, 2, '2016-05-02', 6], [2, 3, '2017-06-25', 1], [3, 1, '2016-03-02', 0], [3, 4, '2018-07-03', 5]]

# Define schema
schema = StructType([
    StructField('player_id', IntegerType(), True),
    StructField('device_id', IntegerType(), True),
    StructField('event_date', StringType(), True),
    StructField('games_played', IntegerType(), True)
])

# Create DataFrame
activity_df = spark.createDataFrame(data, schema)

# Show DataFrame
activity_df.show()


+---------+---------+----------+------------+
|player_id|device_id|event_date|games_played|
+---------+---------+----------+------------+
|        1|        2|2016-03-01|           5|
|        1|        2|2016-05-02|           6|
|        2|        3|2017-06-25|           1|
|        3|        1|2016-03-02|           0|
|        3|        4|2018-07-03|           5|
+---------+---------+----------+------------+



In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import *
windowOptions = Window.partitionBy(col('player_id')).orderBy(col('event_date'))
activity_df.withColumn('first_device',row_number().over(windowOptions)).filter(col('first_device') == 1)\
    .select(col('player_id'),col('device_id')).show()

+---------+---------+
|player_id|device_id|
+---------+---------+
|        1|        2|
|        2|        3|
|        3|        1|
+---------+---------+




## 184. Department Highest Salary
```
Table: Employee

+--------------+---------+
| Column Name  | Type    |
+--------------+---------+
| id           | int     |
| name         | varchar |
| salary       | int     |
| departmentId | int     |
+--------------+---------+
id is the primary key (column with unique values) for this table.
departmentId is a foreign key (reference columns) of the ID from the Department table.
Each row of this table indicates the ID, name, and salary of an employee. It also contains the ID of their department.
 

Table: Department

+-------------+---------+
| Column Name | Type    |
+-------------+---------+
| id          | int     |
| name        | varchar |
+-------------+---------+
id is the primary key (column with unique values) for this table. It is guaranteed that department name is not NULL.
Each row of this table indicates the ID of a department and its name.
 

Write a solution to find employees who have the highest salary in each of the departments.

Return the result table in any order.

The result format is in the following example.

 

Example 1:

Input: 
Employee table:
+----+-------+--------+--------------+
| id | name  | salary | departmentId |
+----+-------+--------+--------------+
| 1  | Joe   | 70000  | 1            |
| 2  | Jim   | 90000  | 1            |
| 3  | Henry | 80000  | 2            |
| 4  | Sam   | 60000  | 2            |
| 5  | Max   | 90000  | 1            |
+----+-------+--------+--------------+
Department table:
+----+-------+
| id | name  |
+----+-------+
| 1  | IT    |
| 2  | Sales |
+----+-------+
Output: 
+------------+----------+--------+
| Department | Employee | Salary |
+------------+----------+--------+
| IT         | Jim      | 90000  |
| Sales      | Henry    | 80000  |
| IT         | Max      | 90000  |
+------------+----------+--------+
Explanation: Max and Jim both have the highest salary in the IT department and Henry has the highest salary in the Sales department.
```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Assuming you already have a Spark session
# spark = SparkSession.builder.appName("example").getOrCreate()

# Define schemas for the DataFrames
employee_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("salary", IntegerType(), True),
    StructField("departmentId", IntegerType(), True)
])

department_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True)
])

# Create data for the DataFrames
employee_data = [
    (1, 'Joe', 70000, 1),
    (2, 'Jim', 90000, 1),
    (3, 'Henry', 80000, 2),
    (4, 'Sam', 60000, 2),
    (5, 'Max', 90000, 1)
]

department_data = [
    (1, 'IT'),
    (2, 'Sales')
]

# Create DataFrames
employee_df = spark.createDataFrame(employee_data, schema=employee_schema)
department_df = spark.createDataFrame(department_data, schema=department_schema)

# Show the DataFrames
employee_df.show()
department_df.show()


+---+-----+------+------------+
| id| name|salary|departmentId|
+---+-----+------+------------+
|  1|  Joe| 70000|           1|
|  2|  Jim| 90000|           1|
|  3|Henry| 80000|           2|
|  4|  Sam| 60000|           2|
|  5|  Max| 90000|           1|
+---+-----+------+------------+

+---+-----+
| id| name|
+---+-----+
|  1|   IT|
|  2|Sales|
+---+-----+



In [0]:
#Write a solution to find employees who have the highest salary in each of the departments.
#Return the result table in any order.
from pyspark.sql.window import Window
from pyspark.sql.functions import *

windowOptions  = Window.partitionBy(col('departmentId')).orderBy(col('salary').desc())
employee_df.alias('E').join(department_df.alias('D'), col('E.departmentId') == col('D.id'),'inner')\
    .withColumn('Rank',rank().over(windowOptions)).filter(col('Rank') == 1)\
        .select(col('D.name').alias('Department'),col('E.name').alias('Employee'),col('salary')).show()


+----------+--------+------+
|Department|Employee|salary|
+----------+--------+------+
|        IT|     Jim| 90000|
|        IT|     Max| 90000|
|     Sales|   Henry| 80000|
+----------+--------+------+



## (Premium) 1549. The Most Recent Orders for Each Product
```
Table: Customers

+---------------+---------+
| Column Name   | Type    |
+---------------+---------+
| customer_id   | int     |
| name          | varchar |
+---------------+---------+
customer_id is the column with unique values for this table.
This table contains information about the customers.
 

Table: Orders

+---------------+---------+
| Column Name   | Type    |
+---------------+---------+
| order_id      | int     |
| order_date    | date    |
| customer_id   | int     |
| product_id    | int     |
+---------------+---------+
order_id is the column with unique values for this table.
This table contains information about the orders made by customer_id.
There will be no product ordered by the same user more than once in one day.
 

Table: Products

+---------------+---------+
| Column Name   | Type    |
+---------------+---------+
| product_id    | int     |
| product_name  | varchar |
| price         | int     |
+---------------+---------+
product_id is the column with unique values for this table.
This table contains information about the Products.
 

Write a solution to find the most recent order(s) of each product.

Return the result table ordered by product_name in ascending order and in case of a tie by the product_id in ascending order. If there still a tie, order them by order_id in ascending order.

The result format is in the following example.

 

Example 1:

Input: 
Customers table:
+-------------+-----------+
| customer_id | name      |
+-------------+-----------+
| 1           | Winston   |
| 2           | Jonathan  |
| 3           | Annabelle |
| 4           | Marwan    |
| 5           | Khaled    |
+-------------+-----------+
Orders table:
+----------+------------+-------------+------------+
| order_id | order_date | customer_id | product_id |
+----------+------------+-------------+------------+
| 1        | 2020-07-31 | 1           | 1          |
| 2        | 2020-07-30 | 2           | 2          |
| 3        | 2020-08-29 | 3           | 3          |
| 4        | 2020-07-29 | 4           | 1          |
| 5        | 2020-06-10 | 1           | 2          |
| 6        | 2020-08-01 | 2           | 1          |
| 7        | 2020-08-01 | 3           | 1          |
| 8        | 2020-08-03 | 1           | 2          |
| 9        | 2020-08-07 | 2           | 3          |
| 10       | 2020-07-15 | 1           | 2          |
+----------+------------+-------------+------------+
Products table:
+------------+--------------+-------+
| product_id | product_name | price |
+------------+--------------+-------+
| 1          | keyboard     | 120   |
| 2          | mouse        | 80    |
| 3          | screen       | 600   |
| 4          | hard disk    | 450   |
+------------+--------------+-------+
Output: 
+--------------+------------+----------+------------+
| product_name | product_id | order_id | order_date |
+--------------+------------+----------+------------+
| keyboard     | 1          | 6        | 2020-08-01 |
| keyboard     | 1          | 7        | 2020-08-01 |
| mouse        | 2          | 8        | 2020-08-03 |
| screen       | 3          | 3        | 2020-08-29 |
+--------------+------------+----------+------------+
Explanation: 
keyboard's most recent order is in 2020-08-01, it was ordered two times this day.
mouse's most recent order is in 2020-08-03, it was ordered only once this day.
screen's most recent order is in 2020-08-29, it was ordered only once this day.
The hard disk was never ordered and we do not include it in the result table.
```

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

# Define schemas for the DataFrames
customers_schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("name", StringType(), True)
])

orders_schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("order_date", StringType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True)
])

products_schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("product_name", StringType(), True),
    StructField("price", IntegerType(), True)
])

# Create data for the DataFrames
customers_data = [
    (1, 'Winston'),
    (2, 'Jonathan'),
    (3, 'Annabelle'),
    (4, 'Marwan'),
    (5, 'Khaled')
]

orders_data = [
    (1, '2020-07-31', 1, 1),
    (2, '2020-07-30', 2, 2),
    (3, '2020-08-29', 3, 3),
    (4, '2020-07-29', 4, 1),
    (5, '2020-06-10', 1, 2),
    (6, '2020-08-01', 2, 1),
    (7, '2020-08-01', 3, 1),
    (8, '2020-08-03', 1, 2),
    (9, '2020-08-07', 2, 3),
    (10, '2020-07-15', 1, 2)
]

products_data = [
    (1, 'keyboard', 120),
    (2, 'mouse', 80),
    (3, 'screen', 600),
    (4, 'hard disk', 450)
]

# Create DataFrames
customers_df = spark.createDataFrame(customers_data, schema=customers_schema)
orders_df = spark.createDataFrame(orders_data, schema=orders_schema)
products_df = spark.createDataFrame(products_data, schema=products_schema)

# Show the DataFrames
customers_df.show()
orders_df.show()
products_df.show()


+-----------+---------+
|customer_id|     name|
+-----------+---------+
|          1|  Winston|
|          2| Jonathan|
|          3|Annabelle|
|          4|   Marwan|
|          5|   Khaled|
+-----------+---------+

+--------+----------+-----------+----------+
|order_id|order_date|customer_id|product_id|
+--------+----------+-----------+----------+
|       1|2020-07-31|          1|         1|
|       2|2020-07-30|          2|         2|
|       3|2020-08-29|          3|         3|
|       4|2020-07-29|          4|         1|
|       5|2020-06-10|          1|         2|
|       6|2020-08-01|          2|         1|
|       7|2020-08-01|          3|         1|
|       8|2020-08-03|          1|         2|
|       9|2020-08-07|          2|         3|
|      10|2020-07-15|          1|         2|
+--------+----------+-----------+----------+

+----------+------------+-----+
|product_id|product_name|price|
+----------+------------+-----+
|         1|    keyboard|  120|
|         2|       mouse

In [0]:
# Write a solution to find the most recent order(s) of each product.

# Return the result table ordered by product_name in ascending order and in case of a tie by the product_id in ascending order. If there still a tie, order them by order_id in ascending order.
windowOptions = Window.partitionBy(col('O.product_id')).orderBy(col('order_date').desc())
orders_df.alias('O').join(products_df.alias('P'), col('O.product_id') == col('P.product_id'),'left')\
    .withColumn('recentOrder',rank().over(windowOptions)).filter(col('recentOrder')==1)\
        .select(col('product_name'),col('P.product_id'),col('O.order_id'),col('O.order_date')).show()


+------------+----------+--------+----------+
|product_name|product_id|order_id|order_date|
+------------+----------+--------+----------+
|    keyboard|         1|       6|2020-08-01|
|    keyboard|         1|       7|2020-08-01|
|       mouse|         2|       8|2020-08-03|
|      screen|         3|       3|2020-08-29|
+------------+----------+--------+----------+



## (Premium) 1532. The Most Recent Three Orders
```
Table: Customers

+---------------+---------+
| Column Name   | Type    |
+---------------+---------+
| customer_id   | int     |
| name          | varchar |
+---------------+---------+
customer_id is the column with unique values for this table.
This table contains information about customers.
 

Table: Orders

+---------------+---------+
| Column Name   | Type    |
+---------------+---------+
| order_id      | int     |
| order_date    | date    |
| customer_id   | int     |
| cost          | int     |
+---------------+---------+
order_id is the column with unique values for this table.
This table contains information about the orders made by customer_id.
Each customer has one order per day.
 

Write a solution to find the most recent three orders of each user. If a user ordered less than three orders, return all of their orders.

Return the result table ordered by customer_name in ascending order and in case of a tie by the customer_id in ascending order. If there is still a tie, order them by order_date in descending order.

The result format is in the following example.

 

Example 1:

Input: 
Customers table:
+-------------+-----------+
| customer_id | name      |
+-------------+-----------+
| 1           | Winston   |
| 2           | Jonathan  |
| 3           | Annabelle |
| 4           | Marwan    |
| 5           | Khaled    |
+-------------+-----------+
Orders table:
+----------+------------+-------------+------+
| order_id | order_date | customer_id | cost |
+----------+------------+-------------+------+
| 1        | 2020-07-31 | 1           | 30   |
| 2        | 2020-07-30 | 2           | 40   |
| 3        | 2020-07-31 | 3           | 70   |
| 4        | 2020-07-29 | 4           | 100  |
| 5        | 2020-06-10 | 1           | 1010 |
| 6        | 2020-08-01 | 2           | 102  |
| 7        | 2020-08-01 | 3           | 111  |
| 8        | 2020-08-03 | 1           | 99   |
| 9        | 2020-08-07 | 2           | 32   |
| 10       | 2020-07-15 | 1           | 2    |
+----------+------------+-------------+------+
Output: 
+---------------+-------------+----------+------------+
| customer_name | customer_id | order_id | order_date |
+---------------+-------------+----------+------------+
| Annabelle     | 3           | 7        | 2020-08-01 |
| Annabelle     | 3           | 3        | 2020-07-31 |
| Jonathan      | 2           | 9        | 2020-08-07 |
| Jonathan      | 2           | 6        | 2020-08-01 |
| Jonathan      | 2           | 2        | 2020-07-30 |
| Marwan        | 4           | 4        | 2020-07-29 |
| Winston       | 1           | 8        | 2020-08-03 |
| Winston       | 1           | 1        | 2020-07-31 |
| Winston       | 1           | 10       | 2020-07-15 |
+---------------+-------------+----------+------------+
Explanation: 
Winston has 4 orders, we discard the order of "2020-06-10" because it is the oldest order.
Annabelle has only 2 orders, we return them.
Jonathan has exactly 3 orders.
Marwan ordered only one time.
We sort the result table by customer_name in ascending order, by customer_id in ascending order, and by order_date in descending order in case of a tie.
 

Follow up: Could you write a general solution for the most recent n orders?
```

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

# Define schemas for the DataFrames
customers_schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("name", StringType(), True)
])

orders_schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("order_date", StringType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("cost", IntegerType(), True)
])

# Create data for the DataFrames
customers_data = [
    (1, 'Winston'),
    (2, 'Jonathan'),
    (3, 'Annabelle'),
    (4, 'Marwan'),
    (5, 'Khaled')
]

orders_data = [
    (1, '2020-07-31', 1, 30),
    (2, '2020-07-30', 2, 40),
    (3, '2020-07-31', 3, 70),
    (4, '2020-07-29', 4, 100),
    (5, '2020-06-10', 1, 1010),
    (6, '2020-08-01', 2, 102),
    (7, '2020-08-01', 3, 111),
    (8, '2020-08-03', 1, 99),
    (9, '2020-08-07', 2, 32),
    (10, '2020-07-15', 1, 2)
]

# Create DataFrames
customers_df = spark.createDataFrame(customers_data, schema=customers_schema)
orders_df = spark.createDataFrame(orders_data, schema=orders_schema)

# Show the DataFrames
customers_df.show()
orders_df.show()


+-----------+---------+
|customer_id|     name|
+-----------+---------+
|          1|  Winston|
|          2| Jonathan|
|          3|Annabelle|
|          4|   Marwan|
|          5|   Khaled|
+-----------+---------+

+--------+----------+-----------+----+
|order_id|order_date|customer_id|cost|
+--------+----------+-----------+----+
|       1|2020-07-31|          1|  30|
|       2|2020-07-30|          2|  40|
|       3|2020-07-31|          3|  70|
|       4|2020-07-29|          4| 100|
|       5|2020-06-10|          1|1010|
|       6|2020-08-01|          2| 102|
|       7|2020-08-01|          3| 111|
|       8|2020-08-03|          1|  99|
|       9|2020-08-07|          2|  32|
|      10|2020-07-15|          1|   2|
+--------+----------+-----------+----+



In [0]:
# Write a solution to find the most recent three orders of each user. If a user ordered less than three orders, return all of their orders.

# Return the result table ordered by customer_name in ascending order and in case of a tie by the customer_id in ascending order. If there is still a tie, order them by order_date in descending order.

windowOptions = Window.partitionBy(col('C.customer_id')).orderBy(col('order_date').desc())
orders_df.alias('O').join(customers_df.alias('C'), col('O.customer_id') == col('C.customer_id'),'left')\
    .withColumn('Recent3Orders',row_number().over(windowOptions)).filter(col('Recent3Orders').isin(1,2,3))\
        .select(col('name').alias('customer_name'),col('C.customer_id'),col('order_id'),col('order_date'))\
            .orderBy(col('customer_name'),col('order_date').desc()).show()


+-------------+-----------+--------+----------+
|customer_name|customer_id|order_id|order_date|
+-------------+-----------+--------+----------+
|    Annabelle|          3|       7|2020-08-01|
|    Annabelle|          3|       3|2020-07-31|
|     Jonathan|          2|       9|2020-08-07|
|     Jonathan|          2|       6|2020-08-01|
|     Jonathan|          2|       2|2020-07-30|
|       Marwan|          4|       4|2020-07-29|
|      Winston|          1|       8|2020-08-03|
|      Winston|          1|       1|2020-07-31|
|      Winston|          1|      10|2020-07-15|
+-------------+-----------+--------+----------+



## (Premium) 1831. Maximum Transaction Each Day
```
Table: Transactions

+----------------+----------+
| Column Name    | Type     |
+----------------+----------+
| transaction_id | int      |
| day            | datetime |
| amount         | int      |
+----------------+----------+
transaction_id is the column with unique values for this table.
Each row contains information about one transaction.
 

Write a solution to report the IDs of the transactions with the maximum amount on their respective day. If in one day there are multiple such transactions, return all of them.

Return the result table ordered by transaction_id in ascending order.

The result format is in the following example.

 

Example 1:

Input: 
Transactions table:
+----------------+--------------------+--------+
| transaction_id | day                | amount |
+----------------+--------------------+--------+
| 8              | 2021-4-3 15:57:28  | 57     |
| 9              | 2021-4-28 08:47:25 | 21     |
| 1              | 2021-4-29 13:28:30 | 58     |
| 5              | 2021-4-28 16:39:59 | 40     |
| 6              | 2021-4-29 23:39:28 | 58     |
+----------------+--------------------+--------+
Output: 
+----------------+
| transaction_id |
+----------------+
| 1              |
| 5              |
| 6              |
| 8              |
+----------------+
Explanation: 
"2021-4-3"  --> We have one transaction with ID 8, so we add 8 to the result table.
"2021-4-28" --> We have two transactions with IDs 5 and 9. The transaction with ID 5 has an amount of 40, while the transaction with ID 9 has an amount of 21. We only include the transaction with ID 5 as it has the maximum amount this day.
"2021-4-29" --> We have two transactions with IDs 1 and 6. Both transactions have the same amount of 58, so we include both in the result table.
We order the result table by transaction_id after collecting these IDs.
 

Follow up: Could you solve it without using the MAX() function?
```

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, StringType

# Define schema for the DataFrame
transactions_schema = StructType([
    StructField("transaction_id", IntegerType(), True),
    StructField("day", StringType(), True),
    StructField("amount", IntegerType(), True)
])

# Create data for the DataFrame
transactions_data = [
    (8, '2021-04-03 15:57:28', 57),
    (9, '2021-04-28 08:47:25', 21),
    (1, '2021-04-29 13:28:30', 58),
    (5, '2021-04-28 16:39:59', 40),
    (6, '2021-04-29 23:39:28', 58)
]

# Create DataFrame
transactions_df = spark.createDataFrame(transactions_data, schema=transactions_schema)

# Show the DataFrame
transactions_df.show()


+--------------+-------------------+------+
|transaction_id|                day|amount|
+--------------+-------------------+------+
|             8|2021-04-03 15:57:28|    57|
|             9|2021-04-28 08:47:25|    21|
|             1|2021-04-29 13:28:30|    58|
|             5|2021-04-28 16:39:59|    40|
|             6|2021-04-29 23:39:28|    58|
+--------------+-------------------+------+



In [0]:
# Write a solution to report the IDs of the transactions with the maximum amount on their respective day. If in one day there are multiple such transactions, return all of them.

# Return the result table ordered by transaction_id in ascending order.
df_with_date = transactions_df.withColumn('date', to_date('day', 'yyyy-MM-dd HH:mm:ss'))

windowOptions = Window.partitionBy(col('date')).orderBy(col('amount').desc())
df_with_date.withColumn('MaxTransactionOfTheDay',rank().over(windowOptions))\
    .filter(col('MaxTransactionOfTheDay') == 1).select(col('transaction_id'))\
        .orderBy(col('transaction_id')).show()

+--------------+
|transaction_id|
+--------------+
|             1|
|             5|
|             6|
|             8|
+--------------+



# Advanced Topics: Window Function and CTE

## (Premium) 1077. Project Employees III
```
Table: Project

+-------------+---------+
| Column Name | Type    |
+-------------+---------+
| project_id  | int     |
| employee_id | int     |
+-------------+---------+
(project_id, employee_id) is the primary key (combination of columns with unique values) of this table.
employee_id is a foreign key (reference column) to Employee table.
Each row of this table indicates that the employee with employee_id is working on the project with project_id.
 

Table: Employee

+------------------+---------+
| Column Name      | Type    |
+------------------+---------+
| employee_id      | int     |
| name             | varchar |
| experience_years | int     |
+------------------+---------+
employee_id is the primary key (column with unique values) of this table.
Each row of this table contains information about one employee.
 

Write a solution to report the most experienced employees in each project. In case of a tie, report all employees with the maximum number of experience years.

Return the result table in any order.

The result format is in the following example.

 

Example 1:

Input: 
Project table:
+-------------+-------------+
| project_id  | employee_id |
+-------------+-------------+
| 1           | 1           |
| 1           | 2           |
| 1           | 3           |
| 2           | 1           |
| 2           | 4           |
+-------------+-------------+
Employee table:
+-------------+--------+------------------+
| employee_id | name   | experience_years |
+-------------+--------+------------------+
| 1           | Khaled | 3                |
| 2           | Ali    | 2                |
| 3           | John   | 3                |
| 4           | Doe    | 2                |
+-------------+--------+------------------+
Output: 
+-------------+---------------+
| project_id  | employee_id   |
+-------------+---------------+
| 1           | 1             |
| 1           | 3             |
| 2           | 1             |
+-------------+---------------+
Explanation: Both employees with id 1 and 3 have the most experience among the employees of the first project. For the second project, the employee with id 1 has the most experience.
```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Assuming you already have a Spark session
spark = SparkSession.builder.getOrCreate()

# Data for the project DataFrame
project_data = [(1, 1), (1, 2), (1, 3), (2, 1), (2, 4)]
project_schema = StructType([
    StructField('project_id', IntegerType(), False),
    StructField('employee_id', IntegerType(), False)
])
project_df = spark.createDataFrame(project_data, schema=project_schema)

# Data for the employee DataFrame
employee_data = [(1, 'Khaled', 3), (2, 'Ali', 2), (3, 'John', 3), (4, 'Doe', 2)]
employee_schema = StructType([
    StructField('employee_id', IntegerType(), False),
    StructField('name', StringType(), False),
    StructField('experience_years', IntegerType(), False)
])
employee_df = spark.createDataFrame(employee_data, schema=employee_schema)

# Show the DataFrames
project_df.show()
employee_df.show()


+----------+-----------+
|project_id|employee_id|
+----------+-----------+
|         1|          1|
|         1|          2|
|         1|          3|
|         2|          1|
|         2|          4|
+----------+-----------+

+-----------+------+----------------+
|employee_id|  name|experience_years|
+-----------+------+----------------+
|          1|Khaled|               3|
|          2|   Ali|               2|
|          3|  John|               3|
|          4|   Doe|               2|
+-----------+------+----------------+



In [0]:
# Write a solution to report the most experienced employees in each project. In case of a tie, report all employees with the maximum number of experience years.

# Return the result table in any order.
from pyspark.sql.window import Window
from pyspark.sql.functions import *
windowOptions = Window.partitionBy(col('P.project_id')).orderBy(col('experience_years').desc())
employee_df.alias('E').join(project_df.alias('P'),col('E.employee_id') == col('P.employee_id'),'left')\
    .withColumn('maxExperience',rank().over(windowOptions)).filter(col('maxExperience') == 1)\
        .select(col('project_id'),col('E.employee_id')).show()

+----------+-----------+
|project_id|employee_id|
+----------+-----------+
|         1|          1|
|         1|          3|
|         2|          1|
+----------+-----------+




## (Premium) 1285. Find the Start and End Number of Continuous Ranges
```
Table: Logs

+---------------+---------+
| Column Name   | Type    |
+---------------+---------+
| log_id        | int     |
+---------------+---------+
log_id is the column of unique values for this table.
Each row of this table contains the ID in a log Table.
 

Write a solution to find the start and end number of continuous ranges in the table Logs.

Return the result table ordered by start_id.

The result format is in the following example.

 

Example 1:

Input: 
Logs table:
+------------+
| log_id     |
+------------+
| 1          |
| 2          |
| 3          |
| 7          |
| 8          |
| 10         |
+------------+
Output: 
+------------+--------------+
| start_id   | end_id       |
+------------+--------------+
| 1          | 3            |
| 7          | 8            |
| 10         | 10           |
+------------+--------------+
Explanation: 
The result table should contain all ranges in table Logs.
From 1 to 3 is contained in the table.
From 4 to 6 is missing in the table
From 7 to 8 is contained in the table.
Number 9 is missing from the table.
Number 10 is contained in the table.
```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType

# Assuming you already have a Spark session
spark = SparkSession.builder.getOrCreate()

# Data for the logs DataFrame
logs_data = [(1,), (2,), (3,), (7,), (8,), (10,)]
logs_schema = StructType([
    StructField('log_id', IntegerType(), False)
])
logs_df = spark.createDataFrame(logs_data, schema=logs_schema)

# Show the DataFrame
logs_df.show()


+------+
|log_id|
+------+
|     1|
|     2|
|     3|
|     7|
|     8|
|    10|
+------+



In [0]:
# Write a solution to find the start and end number of continuous ranges in the table Logs.
# Return the result table ordered by start_id.

windowOptions = Window.orderBy(col('log_id'))
logs_df.withColumn('RN', (col('log_id') - row_number().over(windowOptions))).groupBy(col('RN'))\
    .agg(min(col('log_id')).alias('start_id'),max(col('log_id')).alias('end_id')).drop(col('RN')).show()

+--------+------+
|start_id|end_id|
+--------+------+
|       1|     3|
|       7|     8|
|      10|    10|
+--------+------+



## (Premium) 1596. The Most Frequently Ordered Products for Each Customer
```
Table: Customers

+---------------+---------+
| Column Name   | Type    |
+---------------+---------+
| customer_id   | int     |
| name          | varchar |
+---------------+---------+
customer_id is the column with unique values for this table.
This table contains information about the customers.
 

Table: Orders

+---------------+---------+
| Column Name   | Type    |
+---------------+---------+
| order_id      | int     |
| order_date    | date    |
| customer_id   | int     |
| product_id    | int     |
+---------------+---------+
order_id is the column with unique values for this table.
This table contains information about the orders made by customer_id.
No customer will order the same product more than once in a single day.
 

Table: Products

+---------------+---------+
| Column Name   | Type    |
+---------------+---------+
| product_id    | int     |
| product_name  | varchar |
| price         | int     |
+---------------+---------+
product_id is the column with unique values for this table.
This table contains information about the products.
 

Write a solution to find the most frequently ordered product(s) for each customer.

The result table should have the product_id and product_name for each customer_id who ordered at least one order.

Return the result table in any order.

The result format is in the following example.

 

Example 1:

Input: 
Customers table:
+-------------+-------+
| customer_id | name  |
+-------------+-------+
| 1           | Alice |
| 2           | Bob   |
| 3           | Tom   |
| 4           | Jerry |
| 5           | John  |
+-------------+-------+
Orders table:
+----------+------------+-------------+------------+
| order_id | order_date | customer_id | product_id |
+----------+------------+-------------+------------+
| 1        | 2020-07-31 | 1           | 1          |
| 2        | 2020-07-30 | 2           | 2          |
| 3        | 2020-08-29 | 3           | 3          |
| 4        | 2020-07-29 | 4           | 1          |
| 5        | 2020-06-10 | 1           | 2          |
| 6        | 2020-08-01 | 2           | 1          |
| 7        | 2020-08-01 | 3           | 3          |
| 8        | 2020-08-03 | 1           | 2          |
| 9        | 2020-08-07 | 2           | 3          |
| 10       | 2020-07-15 | 1           | 2          |
+----------+------------+-------------+------------+
Products table:
+------------+--------------+-------+
| product_id | product_name | price |
+------------+--------------+-------+
| 1          | keyboard     | 120   |
| 2          | mouse        | 80    |
| 3          | screen       | 600   |
| 4          | hard disk    | 450   |
+------------+--------------+-------+
Output: 
+-------------+------------+--------------+
| customer_id | product_id | product_name |
+-------------+------------+--------------+
| 1           | 2          | mouse        |
| 2           | 1          | keyboard     |
| 2           | 2          | mouse        |
| 2           | 3          | screen       |
| 3           | 3          | screen       |
| 4           | 1          | keyboard     |
+-------------+------------+--------------+
Explanation: 
Alice (customer 1) ordered the mouse three times and the keyboard one time, so the mouse is the most frequently ordered product for them.
Bob (customer 2) ordered the keyboard, the mouse, and the screen one time, so those are the most frequently ordered products for them.
Tom (customer 3) only ordered the screen (two times), so that is the most frequently ordered product for them.
Jerry (customer 4) only ordered the keyboard (one time), so that is the most frequently ordered product for them.
John (customer 5) did not order anything, so we do not include them in the result table.
```

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType
from pyspark.sql import SparkSession

# Assuming you already have a Spark session
spark = SparkSession.builder.getOrCreate()

# Data for the customers DataFrame
customers_data = [(1, 'Alice'), (2, 'Bob'), (3, 'Tom'), (4, 'Jerry'), (5, 'John')]
customers_schema = StructType([
    StructField('customer_id', IntegerType(), False),
    StructField('name', StringType(), False)
])
customers_df = spark.createDataFrame(customers_data, schema=customers_schema)

# Data for the orders DataFrame
orders_data = [
    (1, '2020-07-31', 1, 1), (2, '2020-07-30', 2, 2), (3, '2020-08-29', 3, 3),
    (4, '2020-07-29', 4, 1), (5, '2020-06-10', 1, 2), (6, '2020-08-01', 2, 1),
    (7, '2020-08-01', 3, 3), (8, '2020-08-03', 1, 2), (9, '2020-08-07', 2, 3),
    (10, '2020-07-15', 1, 2)
]
orders_schema = StructType([
    StructField('order_id', IntegerType(), False),
    StructField('order_date', StringType(), False),
    StructField('customer_id', IntegerType(), False),
    StructField('product_id', IntegerType(), False)
])
orders_df = spark.createDataFrame(orders_data, schema=orders_schema)

# Data for the products DataFrame
products_data = [
    (1, 'keyboard', 120), (2, 'mouse', 80),
    (3, 'screen', 600), (4, 'hard disk', 450)
]
products_schema = StructType([
    StructField('product_id', IntegerType(), False),
    StructField('product_name', StringType(), False),
    StructField('price', IntegerType(), False)
])
products_df = spark.createDataFrame(products_data, schema=products_schema)

# Show the DataFrames
customers_df.show()
orders_df.show()
products_df.show()


+-----------+-----+
|customer_id| name|
+-----------+-----+
|          1|Alice|
|          2|  Bob|
|          3|  Tom|
|          4|Jerry|
|          5| John|
+-----------+-----+

+--------+----------+-----------+----------+
|order_id|order_date|customer_id|product_id|
+--------+----------+-----------+----------+
|       1|2020-07-31|          1|         1|
|       2|2020-07-30|          2|         2|
|       3|2020-08-29|          3|         3|
|       4|2020-07-29|          4|         1|
|       5|2020-06-10|          1|         2|
|       6|2020-08-01|          2|         1|
|       7|2020-08-01|          3|         3|
|       8|2020-08-03|          1|         2|
|       9|2020-08-07|          2|         3|
|      10|2020-07-15|          1|         2|
+--------+----------+-----------+----------+

+----------+------------+-----+
|product_id|product_name|price|
+----------+------------+-----+
|         1|    keyboard|  120|
|         2|       mouse|   80|
|         3|      screen|  6

In [0]:
windowOptions = Window.partitionBy(col('customer_id')).orderBy(col('frequency').desc())
orders_df.alias('O').join(products_df.alias('P'),col('O.product_id') == col('P.product_id'))\
    .groupBy(col('customer_id'),col('O.product_id'),col('P.product_name')).agg(count('*')\
        .alias('frequency')).withColumn('RN',rank().over(windowOptions)).filter(col('RN') == 1).drop(col('RN'),col('frequency')).show()

+-----------+----------+------------+
|customer_id|product_id|product_name|
+-----------+----------+------------+
|          1|         2|       mouse|
|          2|         1|    keyboard|
|          2|         2|       mouse|
|          2|         3|      screen|
|          3|         3|      screen|
|          4|         1|    keyboard|
+-----------+----------+------------+




## (Premium) 1709. Biggest Window Between Visits
```
Table: UserVisits

+-------------+------+
| Column Name | Type |
+-------------+------+
| user_id     | int  |
| visit_date  | date |
+-------------+------+
This table does not have a primary key, it might contain duplicate rows.
This table contains logs of the dates that users visited a certain retailer.
 

Assume today's date is '2021-1-1'.

Write a solution that will, for each user_id, find out the largest window of days between each visit and the one right after it (or today if you are considering the last visit).

Return the result table ordered by user_id.

The query result format is in the following example.

 

Example 1:

Input: 
UserVisits table:
+---------+------------+
| user_id | visit_date |
+---------+------------+
| 1       | 2020-11-28 |
| 1       | 2020-10-20 |
| 1       | 2020-12-3  |
| 2       | 2020-10-5  |
| 2       | 2020-12-9  |
| 3       | 2020-11-11 |
+---------+------------+
Output: 
+---------+---------------+
| user_id | biggest_window|
+---------+---------------+
| 1       | 39            |
| 2       | 65            |
| 3       | 51            |
+---------+---------------+
Explanation: 
For the first user, the windows in question are between dates:
    - 2020-10-20 and 2020-11-28 with a total of 39 days. 
    - 2020-11-28 and 2020-12-3 with a total of 5 days. 
    - 2020-12-3 and 2021-1-1 with a total of 29 days.
Making the biggest window the one with 39 days.
For the second user, the windows in question are between dates:
    - 2020-10-5 and 2020-12-9 with a total of 65 days.
    - 2020-12-9 and 2021-1-1 with a total of 23 days.
Making the biggest window the one with 65 days.
For the third user, the only window in question is between dates 2020-11-11 and 2021-1-1 with a total of 51 days.
```

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, DateType
from pyspark.sql import SparkSession

# Assuming you already have a Spark session
spark = SparkSession.builder.getOrCreate()

# Data for the user_visits DataFrame
user_visits_data = [
    (1, '2020-11-28'), (1, '2020-10-20'), (1, '2020-12-03'),
    (2, '2020-10-05'), (2, '2020-12-09'), (3, '2020-11-11')
]

# Convert date strings to actual DateType

user_visits_schema = StructType([
    StructField('user_id', IntegerType(), False),
    StructField('visit_date', StringType(), False)
])

# Create the DataFrame
user_visits_df = spark.createDataFrame(user_visits_data, schema=user_visits_schema)

# Show the DataFrame
user_visits_df.show()


+-------+----------+
|user_id|visit_date|
+-------+----------+
|      1|2020-11-28|
|      1|2020-10-20|
|      1|2020-12-03|
|      2|2020-10-05|
|      2|2020-12-09|
|      3|2020-11-11|
+-------+----------+



In [0]:
# Assume today's date is '2021-1-1'.

# Write a solution that will, for each user_id, find out the largest window of days between each visit and the one right after it (or today if you are considering the last visit).

# Return the result table ordered by user_id.

windowOptions = Window.partitionBy(col('user_id')).orderBy(col('visit_date'))
user_visits_df.withColumn('visit_date2', lead(col('visit_date'),1,'2021-01-01').over(windowOptions))\
    .withColumn('frequency',abs(datediff(col('visit_date'),col('visit_date2')))).groupBy(col('user_id'))\
        .agg(max(col('frequency')).alias('biggest_window'))\
        .show()

+-------+--------------+
|user_id|biggest_window|
+-------+--------------+
|      1|            39|
|      2|            65|
|      3|            51|
+-------+--------------+



## (Premium) 1270. All People Report to the Given Manager
```
Table: Employees

+---------------+---------+
| Column Name   | Type    |
+---------------+---------+
| employee_id   | int     |
| employee_name | varchar |
| manager_id    | int     |
+---------------+---------+
employee_id is the column of unique values for this table.
Each row of this table indicates that the employee with ID employee_id and name employee_name reports his work to his/her direct manager with manager_id
The head of the company is the employee with employee_id = 1.
 

Write a solution to find employee_id of all employees that directly or indirectly report their work to the head of the company.

The indirect relation between managers will not exceed three managers as the company is small.

Return the result table in any order.

The result format is in the following example.

 

Example 1:

Input: 
Employees table:
+-------------+---------------+------------+
| employee_id | employee_name | manager_id |
+-------------+---------------+------------+
| 1           | Boss          | 1          |
| 3           | Alice         | 3          |
| 2           | Bob           | 1          |
| 4           | Daniel        | 2          |
| 7           | Luis          | 4          |
| 8           | Jhon          | 3          |
| 9           | Angela        | 8          |
| 77          | Robert        | 1          |
+-------------+---------------+------------+
Output: 
+-------------+
| employee_id |
+-------------+
| 2           |
| 77          |
| 4           |
| 7           |
+-------------+
Explanation: 
The head of the company is the employee with employee_id 1.
The employees with employee_id 2 and 77 report their work directly to the head of the company.
The employee with employee_id 4 reports their work indirectly to the head of the company 4 --> 2 --> 1. 
The employee with employee_id 7 reports their work indirectly to the head of the company 7 --> 4 --> 2 --> 1.
The employees with employee_id 3, 8, and 9 do not report their work to the head of the company directly or indirectly. 
```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Create a Spark session
spark = SparkSession.builder.getOrCreate()

# Define the schema
schema = StructType([
    StructField("employee_id", IntegerType(), True),
    StructField("employee_name", StringType(), True),
    StructField("manager_id", IntegerType(), True)
])

# Create the DataFrame
data = [
    (1, 'Boss', 1), 
    (3, 'Alice', 3), 
    (2, 'Bob', 1), 
    (4, 'Daniel', 2), 
    (7, 'Luis', 4), 
    (8, 'John', 3), 
    (9, 'Angela', 8), 
    (77, 'Robert', 1)
]

employees_df = spark.createDataFrame(data, schema)
employees_df.show()


+-----------+-------------+----------+
|employee_id|employee_name|manager_id|
+-----------+-------------+----------+
|          1|         Boss|         1|
|          3|        Alice|         3|
|          2|          Bob|         1|
|          4|       Daniel|         2|
|          7|         Luis|         4|
|          8|         John|         3|
|          9|       Angela|         8|
|         77|       Robert|         1|
+-----------+-------------+----------+



In [0]:
# Write a solution to find employee_id of all employees that directly or indirectly report their work to the head of the company.

# The indirect relation between managers will not exceed three managers as the company is small.

# Return the result table in any order.


df = employees_df.alias('T1').join(employees_df.alias('T2'), col('T1.employee_id') == col('T2.employee_id'),'inner')\
    .filter((col('T1.manager_id') == 1) & (col('T1.employee_id') != 1)).select(col('T1.employee_id'))

df2 = df.alias('T3').join(employees_df.alias('T4'), col('T3.employee_id') == col('T4.manager_id'),'inner' )\
    .select(col('T4.employee_id')).union(df)

df2.alias('T5').join(employees_df.alias('T6'), col('T5.employee_id') == col('T6.manager_id'),'inner').select(col('T6.employee_id'))\
    .union(df2).distinct().show()

+-----------+
|employee_id|
+-----------+
|          4|
|          7|
|         77|
|          2|
+-----------+



## (Premium) 1412. Find the Quiet Students in All Exams
```
Table: Student

+---------------------+---------+
| Column Name         | Type    |
+---------------------+---------+
| student_id          | int     |
| student_name        | varchar |
+---------------------+---------+
student_id is the primary key (column with unique values) for this table.
student_name is the name of the student.
 

Table: Exam

+---------------+---------+
| Column Name   | Type    |
+---------------+---------+
| exam_id       | int     |
| student_id    | int     |
| score         | int     |
+---------------+---------+
(exam_id, student_id) is the primary key (combination of columns with unique values) for this table.
Each row of this table indicates that the student with student_id had a score points in the exam with id exam_id.
 

A quiet student is the one who took at least one exam and did not score the highest or the lowest score.

Write a solution to report the students (student_id, student_name) being quiet in all exams. Do not return the student who has never taken any exam.

Return the result table ordered by student_id.

The result format is in the following example.

 

Example 1:

Input: 
Student table:
+-------------+---------------+
| student_id  | student_name  |
+-------------+---------------+
| 1           | Daniel        |
| 2           | Jade          |
| 3           | Stella        |
| 4           | Jonathan      |
| 5           | Will          |
+-------------+---------------+
Exam table:
+------------+--------------+-----------+
| exam_id    | student_id   | score     |
+------------+--------------+-----------+
| 10         |     1        |    70     |
| 10         |     2        |    80     |
| 10         |     3        |    90     |
| 20         |     1        |    80     |
| 30         |     1        |    70     |
| 30         |     3        |    80     |
| 30         |     4        |    90     |
| 40         |     1        |    60     |
| 40         |     2        |    70     |
| 40         |     4        |    80     |
+------------+--------------+-----------+
Output: 
+-------------+---------------+
| student_id  | student_name  |
+-------------+---------------+
| 2           | Jade          |
+-------------+---------------+
Explanation: 
For exam 1: Student 1 and 3 hold the lowest and high scores respectively.
For exam 2: Student 1 hold both highest and lowest score.
For exam 3 and 4: Studnet 1 and 4 hold the lowest and high scores respectively.
Student 2 and 5 have never got the highest or lowest in any of the exams.
Since student 5 is not taking any exam, he is excluded from the result.
So, we only return the information of Student 2.
```

In [0]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

# Create student DataFrame
student_data = [(1, 'Daniel'), (2, 'Jade'), (3, 'Stella'), (4, 'Jonathan'), (5, 'Will')]
student_schema = ["student_id", "student_name"]
student_df = spark.createDataFrame(student_data, schema=student_schema)

# Create exam DataFrame
exam_data = [(10, 1, 70), (10, 2, 80), (10, 3, 90), (20, 1, 80), (30, 1, 70), (30, 3, 80), (30, 4, 90), (40, 1, 60), (40, 2, 70), (40, 4, 80)]
exam_schema = ["exam_id", "student_id", "score"]
exam_df = spark.createDataFrame(exam_data, schema=exam_schema)

# Show the DataFrames
student_df.show()
exam_df.show()


+----------+------------+
|student_id|student_name|
+----------+------------+
|         1|      Daniel|
|         2|        Jade|
|         3|      Stella|
|         4|    Jonathan|
|         5|        Will|
+----------+------------+

+-------+----------+-----+
|exam_id|student_id|score|
+-------+----------+-----+
|     10|         1|   70|
|     10|         2|   80|
|     10|         3|   90|
|     20|         1|   80|
|     30|         1|   70|
|     30|         3|   80|
|     30|         4|   90|
|     40|         1|   60|
|     40|         2|   70|
|     40|         4|   80|
+-------+----------+-----+



In [0]:
# A quiet student is the one who took at least one exam and did not score the highest or the lowest score.

# Write a solution to report the students (student_id, student_name) being quiet in all exams. Do not return the student who has never taken any exam.

# Return the result table ordered by student_id.

from pyspark.sql.window import Window
from pyspark.sql.functions import *


windowOptions1 = Window.partitionBy(col('exam_id')).orderBy(col('score').asc())
windowOptions2 = Window.partitionBy(col('exam_id')).orderBy(col('score').desc())

df2 = exam_df.alias('E').join(student_df.alias('S'), col('E.student_id') == col('S.student_id'),'left')\
    .withColumn('R1',rank().over(windowOptions1)).withColumn('R2',rank().over(windowOptions2)).drop(col('s.student_id'))

df3 = df2.filter( (col('R1') ==1) | (col('R2') == 1)).select(col('student_id'))

filtered_student_ids = df3.select('student_id').rdd.flatMap(lambda x: x).collect()
df2.filter(~col('student_id').isin(filtered_student_ids)).select(col('student_id'),col('student_name')).distinct().show()


+----------+------------+
|student_id|student_name|
+----------+------------+
|         2|        Jade|
+----------+------------+



## (Premium) 1767. Find the Subtasks That Did Not Execute
```
Table: Tasks

+----------------+---------+
| Column Name    | Type    |
+----------------+---------+
| task_id        | int     |
| subtasks_count | int     |
+----------------+---------+
task_id is the column with unique values for this table.
Each row in this table indicates that task_id was divided into subtasks_count subtasks labeled from 1 to subtasks_count.
It is guaranteed that 2 <= subtasks_count <= 20.
 

Table: Executed

+---------------+---------+
| Column Name   | Type    |
+---------------+---------+
| task_id       | int     |
| subtask_id    | int     |
+---------------+---------+
(task_id, subtask_id) is the combination of columns with unique values for this table.
Each row in this table indicates that for the task task_id, the subtask with ID subtask_id was executed successfully.
It is guaranteed that subtask_id <= subtasks_count for each task_id.
 

Write a solution to report the IDs of the missing subtasks for each task_id.

Return the result table in any order.

The result format is in the following example.

 

Example 1:

Input: 
Tasks table:
+---------+----------------+
| task_id | subtasks_count |
+---------+----------------+
| 1       | 3              |
| 2       | 2              |
| 3       | 4              |
+---------+----------------+
Executed table:
+---------+------------+
| task_id | subtask_id |
+---------+------------+
| 1       | 2          |
| 3       | 1          |
| 3       | 2          |
| 3       | 3          |
| 3       | 4          |
+---------+------------+
Output: 
+---------+------------+
| task_id | subtask_id |
+---------+------------+
| 1       | 1          |
| 1       | 3          |
| 2       | 1          |
| 2       | 2          |
+---------+------------+
Explanation: 
Task 1 was divided into 3 subtasks (1, 2, 3). Only subtask 2 was executed successfully, so we include (1, 1) and (1, 3) in the answer.
Task 2 was divided into 2 subtasks (1, 2). No subtask was executed successfully, so we include (2, 1) and (2, 2) in the answer.
Task 3 was divided into 4 subtasks (1, 2, 3, 4). All of the subtasks were executed successfully.

```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, lit

# Initialize Spark session
spark = SparkSession.builder.appName("tasks_example").getOrCreate()

# Create tasks DataFrame
tasks_data = [(1, 3), (2, 2), (3, 4)]
tasks_schema = ["task_id", "subtasks_count"]
tasks_df = spark.createDataFrame(tasks_data, schema=tasks_schema)

# Create executed DataFrame
executed_data = [(1, 2), (3, 1), (3, 2), (3, 3), (3, 4)]
executed_schema = ["task_id", "subtask_id"]
executed_df = spark.createDataFrame(executed_data, schema=executed_schema)

tasks_df.show()
executed_df.show()

+-------+--------------+
|task_id|subtasks_count|
+-------+--------------+
|      1|             3|
|      2|             2|
|      3|             4|
+-------+--------------+

+-------+----------+
|task_id|subtask_id|
+-------+----------+
|      1|         2|
|      3|         1|
|      3|         2|
|      3|         3|
|      3|         4|
+-------+----------+



In [0]:
tasks_with_subtasks_df = tasks_df.withColumn("subtask_id", explode(sequence(lit(1), col("subtasks_count"))))

tasks_with_subtasks_df.alias('S').join(executed_df.alias('E'), (col('S.task_id') == col('E.task_id')) & \
     (col('S.subtask_id') == col('E.subtask_id')),'left').filter(col('E.subtask_id').isNull()) \
    .select(col('S.task_id'), col('S.subtask_id')).show()

+-------+----------+
|task_id|subtask_id|
+-------+----------+
|      1|         1|
|      1|         3|
|      2|         1|
|      2|         2|
+-------+----------+



## (Premium) 1225. Report Contiguous Dates
```
Table: Failed

+--------------+---------+
| Column Name  | Type    |
+--------------+---------+
| fail_date    | date    |
+--------------+---------+
fail_date is the primary key (column with unique values) for this table.
This table contains the days of failed tasks.
 

Table: Succeeded

+--------------+---------+
| Column Name  | Type    |
+--------------+---------+
| success_date | date    |
+--------------+---------+
success_date is the primary key (column with unique values) for this table.
This table contains the days of succeeded tasks.
 

A system is running one task every day. Every task is independent of the previous tasks. The tasks can fail or succeed.

Write a solution to report the period_state for each continuous interval of days in the period from 2019-01-01 to 2019-12-31.

period_state is 'failed' if tasks in this interval failed or 'succeeded' if tasks in this interval succeeded. Interval of days are retrieved as start_date and end_date.

Return the result table ordered by start_date.

The result format is in the following example.

 

Example 1:

Input: 
Failed table:
+-------------------+
| fail_date         |
+-------------------+
| 2018-12-28        |
| 2018-12-29        |
| 2019-01-04        |
| 2019-01-05        |
+-------------------+
Succeeded table:
+-------------------+
| success_date      |
+-------------------+
| 2018-12-30        |
| 2018-12-31        |
| 2019-01-01        |
| 2019-01-02        |
| 2019-01-03        |
| 2019-01-06        |
+-------------------+
Output: 
+--------------+--------------+--------------+
| period_state | start_date   | end_date     |
+--------------+--------------+--------------+
| succeeded    | 2019-01-01   | 2019-01-03   |
| failed       | 2019-01-04   | 2019-01-05   |
| succeeded    | 2019-01-06   | 2019-01-06   |
+--------------+--------------+--------------+
Explanation: 
The report ignored the system state in 2018 as we care about the system in the period 2019-01-01 to 2019-12-31.
From 2019-01-01 to 2019-01-03 all tasks succeeded and the system state was "succeeded".
From 2019-01-04 to 2019-01-05 all tasks failed and the system state was "failed".
From 2019-01-06 to 2019-01-06 all tasks succeeded and the system state was "succeeded".
```

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DateType, StringType

# Initialize Spark session
spark = SparkSession.builder.appName("FailuresAndSuccesses").getOrCreate()

# Define schemas
failed_schema = StructType([StructField("fail_date", StringType(), True)])
succeeded_schema = StructType([StructField("success_date", StringType(), True)])

# Create data
failed_data = [['2018-12-28'], ['2018-12-29'], ['2019-01-04'], ['2019-01-05']]
succeeded_data = [['2018-12-30'], ['2018-12-31'], ['2019-01-01'], ['2019-01-02'], ['2019-01-03'], ['2019-01-06']]

# Create PySpark DataFrames
failed_spark_df = spark.createDataFrame(failed_data, schema=failed_schema)
succeeded_spark_df = spark.createDataFrame(succeeded_data, schema=succeeded_schema)

# Show the PySpark DataFrames
failed_spark_df.show()
succeeded_spark_df.show()


+----------+
| fail_date|
+----------+
|2018-12-28|
|2018-12-29|
|2019-01-04|
|2019-01-05|
+----------+

+------------+
|success_date|
+------------+
|  2018-12-30|
|  2018-12-31|
|  2019-01-01|
|  2019-01-02|
|  2019-01-03|
|  2019-01-06|
+------------+



In [0]:
# A system is running one task every day. Every task is independent of the previous tasks. The tasks can fail or succeed.

# Write a solution to report the period_state for each continuous interval of days in the period from 2019-01-01 to 2019-12-31.

# period_state is 'failed' if tasks in this interval failed or 'succeeded' if tasks in this interval succeeded. Interval of days are retrieved as start_date and end_date.

# Return the result table ordered by start_date.
from pyspark.sql.functions import *
from pyspark.sql.window import Window
df1 = succeeded_spark_df.withColumn('success_date', col('success_date').cast('date'))\
    .select(col('success_date'),lit('Succeeded').alias('Succeeded'))
df2 = failed_spark_df.withColumn('fail_date', col('fail_date').cast('date'))\
    .select(col('fail_date'),lit('failed').alias('Failed'))

df2 = df1.union(df2).filter(col('success_date').between('2019-01-01','2019-12-31')).orderBy(col('success_date'))

windowOptions = Window.partitionBy(col('Succeeded')).orderBy(col('success_date'))
df2.withColumn('RN',row_number().over(windowOptions)) \
    .withColumn('dateGroup', date_sub(col('success_date'),col('RN'))).groupBy(col('dateGroup'))\
        .agg(max(col('Succeeded')).alias('period_state'), min(col('success_date')).alias('start_date'), \
            max(col('success_date')).alias('end_date')).drop(col('dateGroup')).orderBy(col('start_date')).show()


+------------+----------+----------+
|period_state|start_date|  end_date|
+------------+----------+----------+
|   Succeeded|2019-01-01|2019-01-03|
|      failed|2019-01-04|2019-01-05|
|   Succeeded|2019-01-06|2019-01-06|
+------------+----------+----------+

