In [0]:
from pyspark.sql import *
from pyspark.sql.functions import count

In [0]:
data = [
    (1, 1, 0),
    (1, 1, 1),
    (1, 1, 2),
    (1, 2, 3),
    (1, 2, 4),
    (2, 1, 5),
    (2, 1, 6)
]
columns = ["actor_id", "director_id", "timestamp"]

# Create the DataFrame
df = spark.createDataFrame(data, columns)
df.show()

+--------+-----------+---------+
|actor_id|director_id|timestamp|
+--------+-----------+---------+
|       1|          1|        0|
|       1|          1|        1|
|       1|          1|        2|
|       1|          2|        3|
|       1|          2|        4|
|       2|          1|        5|
|       2|          1|        6|
+--------+-----------+---------+



In [0]:
from pyspark.sql.functions import count

# Group by actor_id and director_id, calculate the count, and filter
df = df.groupBy("actor_id", "director_id") \
       .agg(count("*").alias("count")) \
       .filter("count > 2")  # OR use filter(F.col("count") > 2)

# Show the result
df.show()


+--------+-----------+-----+
|actor_id|director_id|count|
+--------+-----------+-----+
|       1|          1|    3|
+--------+-----------+-----+



#### Write an SPARK query to find all the authors that viewed at least one of their own articles, sorted in ascending order by their id.

In [0]:
data = [
    (1, 3, 5, "2019-08-01"),
    (1, 3, 6, "2019-08-02"),
    (2, 7, 7, "2019-08-01"),
    (2, 7, 6, "2019-08-02"),
    (4, 7, 1, "2019-07-22"),
    (3, 4, 4, "2019-07-21"),
    (3, 4, 4, "2019-07-21"),
]

columns = ["article_id", "author_id", "viewer_id", "view_date"]

# Create the DataFrame
views_df = spark.createDataFrame(data, columns)
views_df.display()

article_id,author_id,viewer_id,view_date
1,3,5,2019-08-01
1,3,6,2019-08-02
2,7,7,2019-08-01
2,7,6,2019-08-02
4,7,1,2019-07-22
3,4,4,2019-07-21
3,4,4,2019-07-21


In [0]:
views_df = views_df.select("author_id").filter("author_id == viewer_id")
views_df.display()


author_id
7
4
4


**Write an SPark query to find the average selling price for each product.

 average_price should be rounded to 2 decimal places.

 The query result format is in the following example:**

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, DateType, StringType
from datetime import datetime

# Initialize Spark session
spark = SparkSession.builder.master("local").appName("Prices and UnitsSold").getOrCreate()

# Define the schema for the Prices table
prices_schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("start_date", DateType(), True),
    StructField("end_date", DateType(), True),
    StructField("price", IntegerType(), True)
])

# Convert string dates to datetime.date objects
prices_data = [
    (1, datetime.strptime("2019-02-17", "%Y-%m-%d").date(), datetime.strptime("2019-02-28", "%Y-%m-%d").date(), 5),
    (1, datetime.strptime("2019-03-01", "%Y-%m-%d").date(), datetime.strptime("2019-03-22", "%Y-%m-%d").date(), 20),
    (2, datetime.strptime("2019-02-01", "%Y-%m-%d").date(), datetime.strptime("2019-02-20", "%Y-%m-%d").date(), 15),
    (2, datetime.strptime("2019-02-21", "%Y-%m-%d").date(), datetime.strptime("2019-03-31", "%Y-%m-%d").date(), 30)
]

# Create Prices DataFrame
prices_df = spark.createDataFrame(prices_data, schema=prices_schema)

# Show the DataFrame
prices_df.show()

# Define the schema for the UnitsSold table
units_sold_schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("purchase_date", DateType(), True),
    StructField("units", IntegerType(), True)
])

# Convert string dates to datetime.date objects
units_sold_data = [
    (1, datetime.strptime("2019-02-25", "%Y-%m-%d").date(), 100),
    (1, datetime.strptime("2019-03-01", "%Y-%m-%d").date(), 15),
    (2, datetime.strptime("2019-02-10", "%Y-%m-%d").date(), 200),
    (2, datetime.strptime("2019-03-22", "%Y-%m-%d").date(), 30)
]

# Create UnitsSold DataFrame
units_sold_df = spark.createDataFrame(units_sold_data, schema=units_sold_schema)

# Show the DataFrame
units_sold_df.show()


+----------+----------+----------+-----+
|product_id|start_date|  end_date|price|
+----------+----------+----------+-----+
|         1|2019-02-17|2019-02-28|    5|
|         1|2019-03-01|2019-03-22|   20|
|         2|2019-02-01|2019-02-20|   15|
|         2|2019-02-21|2019-03-31|   30|
+----------+----------+----------+-----+

+----------+-------------+-----+
|product_id|purchase_date|units|
+----------+-------------+-----+
|         1|   2019-02-25|  100|
|         1|   2019-03-01|   15|
|         2|   2019-02-10|  200|
|         2|   2019-03-22|   30|
+----------+-------------+-----+



In [0]:
from pyspark.sql.functions import col, sum

In [0]:
df_new=prices_df.join(units_sold_df, on="product_id" ).filter((col("purchase_date")>col("start_date")) & (col("purchase_date")<col("end_date")))
df_new=df_new.groupBy("product_id").agg(sum(col("units")*col("price"))/sum("units"))
df_new.show()

+----------+-----------------------------------+
|product_id|(sum((units * price)) / sum(units))|
+----------+-----------------------------------+
|         1|                                5.0|
|         2|                 16.956521739130434|
+----------+-----------------------------------+



# Write a Pyspark solution to output big countries' name, population and area.

In [0]:
world_schema = StructType([
    StructField("name", StringType(), True),
    StructField("continent", StringType(), True),
    StructField("area", IntegerType(), True),
    StructField("population", IntegerType(), True),
    StructField("gdp", IntegerType(), True)
])

# Data for the World table
world_data = [
    ("Afghanistan", "Asia", 652230, 25500100, 20343000),
    ("Albania", "Europe", 28748, 2831741, 12960000),
    ("Algeria", "Africa", 2381741, 37100000, 188681000),
    ("Andorra", "Europe", 468, 78115, 3712000),
    ("Angola", "Africa", 1246700, 20609294, 100990000)
]

# Create World DataFrame
world_df = spark.createDataFrame(world_data, schema=world_schema)

# Show the DataFrame
world_df.show()

+-----------+---------+-------+----------+---------+
|       name|continent|   area|population|      gdp|
+-----------+---------+-------+----------+---------+
|Afghanistan|     Asia| 652230|  25500100| 20343000|
|    Albania|   Europe|  28748|   2831741| 12960000|
|    Algeria|   Africa|2381741|  37100000|188681000|
|    Andorra|   Europe|    468|     78115|  3712000|
|     Angola|   Africa|1246700|  20609294|100990000|
+-----------+---------+-------+----------+---------+



In [0]:
world_df=world_df.select("name","population").filter((col("area")>3000000) | (col("population")>25000000)).show()

+-----------+----------+
|       name|population|
+-----------+----------+
|Afghanistan|  25500100|
|    Algeria|  37100000|
+-----------+----------+



 Can you write a SQL query to find the biggest number, which only appears once.

In [0]:
from pyspark.sql.functions import col

# Sample data
data = [(8,), (8,), (3,), (3,), (1,), (4,), (5,), (6,)]
columns = ["num"]

# Creating DataFrame
df = spark.createDataFrame(data, columns)

In [0]:
df = df.groupBy("num").agg(count("*").alias("count")).filter(col("count")==1).orderBy("num",ascending=False).limit(1)
df.show()

+---+-----+
|num|count|
+---+-----+
|  6|    1|
+---+-----+



There is a table courses with columns: student and class

-- Please list out all classes which have more than or equal to 5 students.


In [0]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder.appName("CreateDataFrameExample").getOrCreate()

# Define the data as a list of tuples
data = [
    ("A", "Math"),
    ("B", "English"),
    ("C", "Math"),
    ("D", "Biology"),
    ("E", "Math"),
    ("F", "Computer"),
    ("G", "Math"),
    ("H", "Math"),
    ("I", "Math"),
]

# Define the schema (column names)
columns = ["student", "class"]

# Create the DataFrame
df = spark.createDataFrame(data, columns)

# Show the DataFrame
df.show()


+-------+--------+
|student|   class|
+-------+--------+
|      A|    Math|
|      B| English|
|      C|    Math|
|      D| Biology|
|      E|    Math|
|      F|Computer|
|      G|    Math|
|      H|    Math|
|      I|    Math|
+-------+--------+



In [0]:
df.groupBy("class").agg(count("*").alias("count")).filter("count>5").show()

+-----+-----+
|class|count|
+-----+-----+
| Math|    6|
+-----+-----+



Write a SQL query for a report that provides the following information for each person in the Person table,
-- regardless if there is an address for each of those people:


In [0]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder.appName("CreateDataFrameExample").getOrCreate()

# Data for the Person table
person_data = [
    (1, "John", "Doe"),
    (2, "Jane", "Smith"),
    (3, "Sam", "Brown"),
    (4, "Lisa", "White")
]

# Schema for the Person table
person_columns = ["PersonId", "FirstName", "LastName"]

# Create Person DataFrame
person_df = spark.createDataFrame(person_data, person_columns)

# Data for the Address table
address_data = [
    (1, 1, "New York", "NY"),
    (2, 2, "Los Angeles", "CA"),
    (3, 4, "Chicago", "IL")
]

# Schema for the Address table
address_columns = ["AddressId", "PersonId", "City", "State"]

# Create Address DataFrame
address_df = spark.createDataFrame(address_data, address_columns)

# Show the DataFrames
print("Person DataFrame:")
person_df.show()

print("Address DataFrame:")
address_df.show()


Person DataFrame:
+--------+---------+--------+
|PersonId|FirstName|LastName|
+--------+---------+--------+
|       1|     John|     Doe|
|       2|     Jane|   Smith|
|       3|      Sam|   Brown|
|       4|     Lisa|   White|
+--------+---------+--------+

Address DataFrame:
+---------+--------+-----------+-----+
|AddressId|PersonId|       City|State|
+---------+--------+-----------+-----+
|        1|       1|   New York|   NY|
|        2|       2|Los Angeles|   CA|
|        3|       4|    Chicago|   IL|
+---------+--------+-----------+-----+



In [0]:
df_pa=person_df.join(address_df,address_df.PersonId==person_df.PersonId,"inner").select("FirstName").show()

+---------+
|FirstName|
+---------+
|     John|
|     Jane|
|     Lisa|
+---------+



Several friends at a cinema ticket office would like to reserve consecutive available seats.
-- Can you help to query all the consecutive available seats order by the seat_id using the following cinema table?

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import lead, lag

# Initialize SparkSession
spark = SparkSession.builder.appName("LeadLagExample").getOrCreate()

# Sample data for the cinema table
data = [
    (1, 1),
    (2, 0),
    (3, 1),
    (4, 1),
    (5, 1)
]

# Define the schema
columns = ["seat_id", "free"]

# Create the DataFrame
df = spark.createDataFrame(data, columns)

# Define a window specification ordered by seat_id
window_spec = Window.orderBy("seat_id")

# Add 'lead' and 'lag' columns
new_df = (
    df.withColumn("lead", lead("free", 1).over(window_spec))  # Adds next row's value
       .withColumn("lag", lag("free", 1).over(window_spec))   # Adds previous row's value
)

# Show the result
new_df.show()


+-------+----+----+----+
|seat_id|free|lead| lag|
+-------+----+----+----+
|      1|   1|   0|NULL|
|      2|   0|   1|   1|
|      3|   1|   1|   0|
|      4|   1|   1|   1|
|      5|   1|NULL|   1|
+-------+----+----+----+



In [0]:
consecutive_seats = new_df.filter((new_df.free == 1) & (new_df.lag == 1))
consecutive_seats.show()


+-------+----+----+---+
|seat_id|free|lead|lag|
+-------+----+----+---+
|      4|   1|   1|  1|
|      5|   1|NULL|  1|
+-------+----+----+---+



Query the customer_number from the orders table for the customer who has placed the largest number of orders.

-- It is guaranteed that exactly one customer will have placed more orders than any other customer.

-- The orders table is defined as follows:

-- | Column            | Type      |
-- |-------------------|-----------|
-- | order_number (PK) | int       |
-- | customer_number   | int       |
-- | order_date        | date      |
-- | required_date     | date      |
-- | shipped_date      | date      |
-- | status            | char(15)  |
-- | comment           | char(200) |
-- Sample Input

In [0]:

spark = SparkSession.builder.appName("CreateDataFrame").getOrCreate()

# Sample data for the orders table
data = [
    (1, 1, "2017-04-09", "2017-04-13", "2017-04-12", "Closed", ""),
    (2, 2, "2017-04-15", "2017-04-20", "2017-04-18", "Closed", ""),
    (3, 3, "2017-04-16", "2017-04-25", "2017-04-20", "Closed", ""),
    (4, 3, "2017-04-18", "2017-04-28", "2017-04-25", "Closed", "")
]

# Define the schema
columns = ["order_number", "customer_number", "order_date", "required_date", "shipped_date", "status", "comment"]

# Create the DataFrame
df = spark.createDataFrame(data, columns)

# Show the DataFrame
df.show()


+------------+---------------+----------+-------------+------------+------+-------+
|order_number|customer_number|order_date|required_date|shipped_date|status|comment|
+------------+---------------+----------+-------------+------------+------+-------+
|           1|              1|2017-04-09|   2017-04-13|  2017-04-12|Closed|       |
|           2|              2|2017-04-15|   2017-04-20|  2017-04-18|Closed|       |
|           3|              3|2017-04-16|   2017-04-25|  2017-04-20|Closed|       |
|           4|              3|2017-04-18|   2017-04-28|  2017-04-25|Closed|       |
+------------+---------------+----------+-------------+------------+------+-------+



In [0]:

df.groupBy("customer_number").agg(count("*").alias("count")).orderBy("count",ascending=False ).limit(1).show()



+---------------+-----+
|customer_number|count|
+---------------+-----+
|              3|    2|
+---------------+-----+



- Question 13
-- Suppose that a website contains two tables, 
-- the Customers table and the Orders table. Write a SQL query to find all customers who never order anything.

-- Table: Customers.

-- +----+-------+
-- | Id | Name  |
-- +----+-------+
-- | 1  | Joe   |
-- | 2  | Henry |
-- | 3  | Sam   |
-- | 4  | Max   |
-- +----+-------+
-- Table: Orders.

-- +----+------------+
-- | Id | CustomerId |
-- +----+------------+
-- | 1  | 3          |
-- | 2  | 1          |
-- +----+------------+
-- Using the above tables as example, return the following:

In [0]:
# Initialize SparkSession
spark = SparkSession.builder.appName("CreateCustomersOrdersDataFrame").getOrCreate()

# Sample data for Customers and Orders tables
customers_data = [
    (1, "Joe"),
    (2, "Henry"),
    (3, "Sam"),
    (4, "Max")
]

orders_data = [
    (1, 3),
    (2, 1)
]

# Define the schema for Customers and Orders DataFrames
customers_columns = ["Id", "Name"]
orders_columns = ["Id", "CustomerId"]

# Create DataFrames
customers_df = spark.createDataFrame(customers_data, customers_columns)
orders_df = spark.createDataFrame(orders_data, orders_columns)

# Show the DataFrames
print("Customers DataFrame:")
customers_df.show()

print("Orders DataFrame:")
orders_df.show()

Customers DataFrame:
+---+-----+
| Id| Name|
+---+-----+
|  1|  Joe|
|  2|Henry|
|  3|  Sam|
|  4|  Max|
+---+-----+

Orders DataFrame:
+---+----------+
| Id|CustomerId|
+---+----------+
|  1|         3|
|  2|         1|
+---+----------+



In [0]:
from pyspark.sql import functions as F

customers_df \
    .join(orders_df, customers_df.Id == orders_df.CustomerId, "left_outer") \
    .filter(orders_df.CustomerId.isNull()) \
    .select("Name") \
    .show()

+-----+
| Name|
+-----+
|Henry|
|  Max|
+-----+



Write a SQL query to delete all duplicate email entries in a table named Person, keeping only unique emails based on its smallest Id.

-- +----+------------------+
-- | Id | Email            |
-- +----+------------------+
-- | 1  | john@example.com |
-- | 2  | bob@example.com  |
-- | 3  | john@example.com |
-- +----+------------------+
-- Id is the primary key column for this table.
-- For example, after running your query, the above Person table should have the following rows:

-- +----+------------------+
-- | Id | Email            |
-- +----+------------------+
-- | 1  | john@example.com |
-- | 2  | bob@example.com  |
-- +----+------------------+

In [0]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder.appName("CreateDataFrame").getOrCreate()

# Define the data
data = [
    (1, "john@example.com"),
    (2, "bob@example.com"),
    (3, "john@example.com")
]

# Define the schema
columns = ["Id", "Email"]

# Create a DataFrame
df = spark.createDataFrame(data, schema=columns)

# Show the DataFrame
df.show()


+---+----------------+
| Id|           Email|
+---+----------------+
|  1|john@example.com|
|  2| bob@example.com|
|  3|john@example.com|
+---+----------------+



In [0]:
from pyspark.sql import Window
from pyspark.sql.functions import row_number

In [0]:
windows=Window.partitionBy("Email").orderBy("Id")
df=df.withColumn("row",row_number().over(windows))
# Filter to keep only the first occurrence of each Email
#df = df.filter("row = 1").drop("row")
df.show()

+---+----------------+---+
| Id|           Email|row|
+---+----------------+---+
|  2| bob@example.com|  1|
|  1|john@example.com|  1|
|  3|john@example.com|  2|
+---+----------------+---+



## -- Write a SQL query to find all duplicate emails in a table named Person.

In [0]:
df.groupBy("Email").agg(count("*").alias("count")).filter(col("count")>1).show()

+----------------+-----+
|           Email|count|
+----------------+-----+
|john@example.com|    2|
+----------------+-----+



Question 4
-- Select all employee's name and bonus whose bonus is < 1000.

-- Table:Employee


-- +-------+--------+-----------+--------+
-- | empId |  name  | supervisor| salary |
-- +-------+--------+-----------+--------+
-- |   1   | John   |  3        | 1000   |
-- |   2   | Dan    |  3        | 2000   |
-- |   3   | Brad   |  null     | 4000   |
-- |   4   | Thomas |  3        | 4000   |
-- +-------+--------+-----------+--------+
-- empId is the primary key column for this table.
-- Table: Bonus


-- +-------+-------+
-- | empId | bonus |
-- +-------+-------+
-- | 2     | 500   |
-- | 4     | 2000  |
-- +-------+-------+
-- empId is the primary key column for this table.
-- Example ouput:


In [0]:
spark = SparkSession.builder.appName("Employee Bonus").getOrCreate()

# Create the Employee DataFrame
employee_data = [
    (1, "John", 3, 1000),
    (2, "Dan", 3, 2000),
    (3, "Brad", None, 4000),
    (4, "Thomas", 3, 4000)
]
employee_columns = ["empId", "name", "supervisor", "salary"]

employee_df = spark.createDataFrame(data=employee_data, schema=employee_columns)

# Create the Bonus DataFrame
bonus_data = [
    (2, 500),
    (4, 2000)
]
bonus_columns = ["empId", "bonus"]

bonus_df = spark.createDataFrame(data=bonus_data, schema=bonus_columns)

In [0]:
new_df=employee_df.join(bonus_df,employee_df.empId==bonus_df.empId,"left")
new_df.show()
new_df.filter("bonus<1000 & bonus is null").show()

+-----+------+----------+------+-----+-----+
|empId|  name|supervisor|salary|empId|bonus|
+-----+------+----------+------+-----+-----+
|    1|  John|         3|  1000| NULL| NULL|
|    2|   Dan|         3|  2000|    2|  500|
|    3|  Brad|      NULL|  4000| NULL| NULL|
|    4|Thomas|         3|  4000|    4| 2000|
+-----+------+----------+------+-----+-----+

+-----+----+----------+------+-----+-----+
|empId|name|supervisor|salary|empId|bonus|
+-----+----+----------+------+-----+-----+
|    1|John|         3|  1000| NULL| NULL|
|    3|Brad|      NULL|  4000| NULL| NULL|
+-----+----+----------+------+-----+-----+



Question 15
**-- The Employee table holds all employees including their managers. 
-- Every employee has an Id, and there is also a column for the manager Id.


-- Given the Employee table, write a SQL query that finds out employees who earn more than their managers. 
-- For the above table, Joe is the only employee who earns more than his manager.**

In [0]:
spark = SparkSession.builder.appName("Self Join Example").getOrCreate()

# Create the Employee DataFrame
employee_data = [
    (1, "John", 3, 1000),
    (2, "Dan", 3, 5800),
    (3, "Brad", None, 4000),
    (4, "Thomas", 3, 1000)
]
employee_columns = ["empId", "name", "supervisor", "salary"]

employee_df = spark.createDataFrame(data=employee_data, schema=employee_columns)

In [0]:
employee_df.alias("e1").join(employee_df.alias("e2"),col("e1.supervisor")==col("e2.empId"),"left").filter("e1.salary>e2.salary").show()

+-----+----+----------+------+-----+----+----------+------+
|empId|name|supervisor|salary|empId|name|supervisor|salary|
+-----+----+----------+------+-----+----+----------+------+
|    2| Dan|         3|  5800|    3|Brad|      NULL|  4000|
+-----+----+----------+------+-----+----+----------+------+



## Write a query to return the list of customers NOT referred by the person with id '2'.

In [0]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("Customer Table Example").getOrCreate()

# Data for the customer table
customer_data = [
    (1, "Will", None),
    (2, "Jane", None),
    (3, "Alex", 2),
    (4, "Bill", None),
    (5, "Zack", 1),
    (6, "Mark", 2)
]

# Define column names
customer_columns = ["id", "name", "referee_id"]

# Create the DataFrame
customer_df = spark.createDataFrame(data=customer_data, schema=customer_columns)

# Show the DataFrame
customer_df.show()


+---+----+----------+
| id|name|referee_id|
+---+----+----------+
|  1|Will|      NULL|
|  2|Jane|      NULL|
|  3|Alex|         2|
|  4|Bill|      NULL|
|  5|Zack|         1|
|  6|Mark|         2|
+---+----+----------+



In [0]:
customer_df.filter("referee_id <>2 or referee_id is null").show()

+---+----+----------+
| id|name|referee_id|
+---+----+----------+
|  1|Will|      NULL|
|  2|Jane|      NULL|
|  4|Bill|      NULL|
|  5|Zack|         1|
+---+----+----------+



## Write an SQL query to find the team size of each of the employees.

In [0]:
employee_data = [
    (1, 101),
    (2, 102),
    (3, 101),
    (4, 103),
    (5, 102)
]

# Define column names
employee_columns = ["employee_id", "team_id"]

# Create the DataFrame
employee_df = spark.createDataFrame(data=employee_data, schema=employee_columns)

# Show the DataFrame
employee_df.show()

+-----------+-------+
|employee_id|team_id|
+-----------+-------+
|          1|    101|
|          2|    102|
|          3|    101|
|          4|    103|
|          5|    102|
+-----------+-------+



In [0]:
new_df= employee_df.groupBy("team_id").agg(count("*").alias("team_size"))
new_df.show()

+-------+---------+
|team_id|team_size|
+-------+---------+
|    101|        2|
|    102|        2|
|    103|        1|
+-------+---------+



In [0]:
new_em=employee_df.join(new_df,employee_df.team_id==new_df.team_id,"left").select("employee_id","team_size")
new_em.show()

+-----------+---------+
|employee_id|team_size|
+-----------+---------+
|          1|        2|
|          2|        2|
|          3|        2|
|          4|        1|
|          5|        2|
+-----------+---------+



###  Write an SQL query to report the distinct titles of the kid-friendly movies streamed in June 2020.

In [0]:
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder.appName("TVProgram and Content Tables").getOrCreate()

# Data for TVProgram table
tv_program_data = [
    ("2020-06-10 08:00", 1, "LC-Channel"),
    ("2020-05-11 12:00", 2, "LC-Channel"),
    ("2020-05-12 12:00", 3, "LC-Channel"),
    ("2020-05-13 14:00", 4, "Disney Ch"),
    ("2020-06-18 14:00", 4, "Disney Ch"),
    ("2020-07-15 16:00", 5, "Disney Ch")
]
tv_program_columns = ["program_date", "content_id", "channel"]

# Create TVProgram DataFrame
tv_program_df = spark.createDataFrame(data=tv_program_data, schema=tv_program_columns)

# Data for Content table
content_data = [
    (1, "Leetcode Movie", "N", "Movies"),
    (2, "Alg. for Kids", "Y", "Series"),
    (3, "Database Sols", "N", "Series"),
    (4, "Aladdin", "Y", "Movies"),
    (5, "Cinderella", "Y", "Movies")
]
content_columns = ["content_id", "title", "Kids_content", "content_type"]

# Create Content DataFrame
content_df = spark.createDataFrame(data=content_data, schema=content_columns)

# Display the DataFrames
print("TVProgram Table:")
tv_program_df.show()

print("Content Table:")
content_df.show()


TVProgram Table:
+----------------+----------+----------+
|    program_date|content_id|   channel|
+----------------+----------+----------+
|2020-06-10 08:00|         1|LC-Channel|
|2020-05-11 12:00|         2|LC-Channel|
|2020-05-12 12:00|         3|LC-Channel|
|2020-05-13 14:00|         4| Disney Ch|
|2020-06-18 14:00|         4| Disney Ch|
|2020-07-15 16:00|         5| Disney Ch|
+----------------+----------+----------+

Content Table:
+----------+--------------+------------+------------+
|content_id|         title|Kids_content|content_type|
+----------+--------------+------------+------------+
|         1|Leetcode Movie|           N|      Movies|
|         2| Alg. for Kids|           Y|      Series|
|         3| Database Sols|           N|      Series|
|         4|       Aladdin|           Y|      Movies|
|         5|    Cinderella|           Y|      Movies|
+----------+--------------+------------+------------+



In [0]:
tv_program_df.join(content_df,tv_program_df.content_id==content_df.content_id,"left").\
  filter("Kids_content='Y'and program_date>'2020-05-31' and program_date<'2020-06-30'").select("program_date","title").show()

+----------------+-------+
|    program_date|  title|
+----------------+-------+
|2020-06-18 14:00|Aladdin|
+----------------+-------+



 Write an SQL query that reports the first login date for each player.

-- The query result format is in the following example:

In [0]:
spark = SparkSession.builder.appName("Activity Table").getOrCreate()

# Data for Activity table
activity_data = [
    (1, 2, "2016-03-01", 5),
    (1, 2, "2016-05-02", 6),
    (2, 3, "2017-06-25", 1),
    (3, 1, "2016-03-02", 0),
    (3, 4, "2018-07-03", 5)
]
activity_columns = ["player_id", "device_id", "event_date", "games_played"]

# Create Activity DataFrame
activity_df = spark.createDataFrame(data=activity_data, schema=activity_columns)

# Display the Activity DataFrame
print("Activity Table:")
activity_df.show()

Activity Table:
+---------+---------+----------+------------+
|player_id|device_id|event_date|games_played|
+---------+---------+----------+------------+
|        1|        2|2016-03-01|           5|
|        1|        2|2016-05-02|           6|
|        2|        3|2017-06-25|           1|
|        3|        1|2016-03-02|           0|
|        3|        4|2018-07-03|           5|
+---------+---------+----------+------------+



In [0]:
window_spec = Window.partitionBy("player_id").orderBy("event_date")
activity_df_with_row = activity_df.withColumn("row_number", F.row_number().over(window_spec))
result_df = activity_df_with_row.filter("row_number = 1").drop("row_number")