In [1]:
import findspark
findspark.init()
findspark.find()

'C:\\Spark\\spark-3.3.1-bin-hadoop2'

In [2]:
import pyspark

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.appName("Interview_Questions").getOrCreate()

In [5]:
spark

In [6]:
from pyspark.sql.functions import col

# Create a Spark session

# Sample data for the DataFrame
data = [
    (1, "Alice", 25, 50000),
    (2, "Bob", 35, 60000),
    (3, "Charlie", 28, 55000),
    (4, "David", 40, 70000),
    (5, "Eve", 22, 48000)
]

# Create a DataFrame with the given schema
columns = ["id", "name", "age", "salary"]
df = spark.createDataFrame(data, columns)

# Show the created DataFrame
df.show()

# Expected Output:
# +---+-------+---+------+
# | id|   name|age|salary|
# +---+-------+---+------+
# |  1|  Alice| 25| 50000|
# |  2|    Bob| 35| 60000|
# |  3|Charlie| 28| 55000|
# |  4|  David| 40| 70000|
# |  5|    Eve| 22| 48000|
# +---+-------+---+------+


+---+-------+---+------+
| id|   name|age|salary|
+---+-------+---+------+
|  1|  Alice| 25| 50000|
|  2|    Bob| 35| 60000|
|  3|Charlie| 28| 55000|
|  4|  David| 40| 70000|
|  5|    Eve| 22| 48000|
+---+-------+---+------+



In [48]:
# Select only the name and salary columns.

df.select(col('name'),col('salary')).show()

+-------+------+
|   name|salary|
+-------+------+
|  Alice| 50000|
|    Bob| 60000|
|Charlie| 55000|
|  David| 70000|
|    Eve| 48000|
+-------+------+



In [8]:
df.select("name","name").show()

+-------+-------+
|   name|   name|
+-------+-------+
|  Alice|  Alice|
|    Bob|    Bob|
|Charlie|Charlie|
|  David|  David|
|    Eve|    Eve|
+-------+-------+



In [9]:
df.select('name','salary').show()

+-------+------+
|   name|salary|
+-------+------+
|  Alice| 50000|
|    Bob| 60000|
|Charlie| 55000|
|  David| 70000|
|    Eve| 48000|
+-------+------+



In [10]:
# Filter the DataFrame to only include rows where age is greater than 30.
df.filter(col('age') >30).show()

+---+-----+---+------+
| id| name|age|salary|
+---+-----+---+------+
|  2|  Bob| 35| 60000|
|  4|David| 40| 70000|
+---+-----+---+------+



In [9]:
df.filter(df.age >30).show()

+---+-----+---+------+
| id| name|age|salary|
+---+-----+---+------+
|  2|  Bob| 35| 60000|
|  4|David| 40| 70000|
+---+-----+---+------+



In [11]:
# Correct syntax using 'where'
df.where(col("age") > 30).show()


+---+-----+---+------+
| id| name|age|salary|
+---+-----+---+------+
|  2|  Bob| 35| 60000|
|  4|David| 40| 70000|
+---+-----+---+------+



In [12]:
from pyspark.sql.functions import to_date


# Sample data for the DataFrame
data = [
    (1, "2022-01-01", 10, 100.0),
    (2, "2022-01-02", 20, 300.0),
    (1, "2022-01-03", 15, 150.0),
    (3, "2022-01-04", 25, 500.0),
    (2, "2022-01-05", 30, 600.0),
]

# Create a DataFrame with the given schema
columns = ["product_id", "date", "quantity", "revenue"]
df = spark.createDataFrame(data, columns)

# Convert the 'date' column to a proper date format
df = df.withColumn("date", to_date(df["date"], "yyyy-MM-dd"))

# Show the created 
df.printSchema()
df.show()

# Expected output:
# +-----------+----------+--------+--------+
# | product_id|      date|quantity| revenue|
# +-----------+----------+--------+--------+
# |          1|2022-01-01|      10|   100.0|
# |          2|2022-01-02|      20|   300.0|
# |          1|2022-01-03|      15|   150.0|
# |          3|2022-01-04|      25|   500.0|
# |          2|2022-01-05|      30|   600.0|
# +-----------+----------+--------+--------+


root
 |-- product_id: long (nullable = true)
 |-- date: date (nullable = true)
 |-- quantity: long (nullable = true)
 |-- revenue: double (nullable = true)

+----------+----------+--------+-------+
|product_id|      date|quantity|revenue|
+----------+----------+--------+-------+
|         1|2022-01-01|      10|  100.0|
|         2|2022-01-02|      20|  300.0|
|         1|2022-01-03|      15|  150.0|
|         3|2022-01-04|      25|  500.0|
|         2|2022-01-05|      30|  600.0|
+----------+----------+--------+-------+



In [13]:
# Group the data by product_id and calculate the total quantity and revenue for each product.
from pyspark.sql.functions import *

grouped_df = df.groupBy('product_id').agg(sum('quantity').alias('total_quantity'),sum('revenue').alias('total_revenue') )
grouped_df.show()

+----------+--------------+-------------+
|product_id|total_quantity|total_revenue|
+----------+--------------+-------------+
|         1|            25|        250.0|
|         2|            50|        900.0|
|         3|            25|        500.0|
+----------+--------------+-------------+



In [14]:
df.createOrReplaceTempView("sales_view")


# Using Spark SQL to query the temporary view
query = "SELECT product_id,sum(quantity),sum(revenue) FROM sales_view group by product_id"
result = spark.sql(query)



In [15]:
result.show()

+----------+-------------+------------+
|product_id|sum(quantity)|sum(revenue)|
+----------+-------------+------------+
|         1|           25|       250.0|
|         2|           50|       900.0|
|         3|           25|       500.0|
+----------+-------------+------------+



In [16]:
# Find the product with the highest total revenue.
# Get the product with the highest total revenue
max_revenue_product = grouped_df.orderBy("total_revenue", ascending=False).first()

print("Product with the highest total revenue:")
print(max_revenue_product)

# Expected output:
# Product with the highest total revenue:
# Row(product_id=2, total_quantity=50, total_revenue=900.0)


Product with the highest total revenue:
Row(product_id=2, total_quantity=50, total_revenue=900.0)


In [17]:
# Find the product with the highest total revenue.
result2 = spark.sql("""
with cte as 
                    (select product_id,sum(revenue) as total_revenue from sales_view group by product_id order by total_revenue desc)
                    
                    select * from cte limit 1
                    """).show()


+----------+-------------+
|product_id|total_revenue|
+----------+-------------+
|         2|        900.0|
+----------+-------------+



In [18]:
# Create sample data for the 'employees' DataFrame
employees_data = [
    (1, "Alice", 101),
    (2, "Bob", 102),
    (3, "Charlie", 101),
    (4, "David", 103),
    (5, "Eve", 102)
]

# Create the 'employees' DataFrame
employees_columns = ["employee_id", "name", "department_id"]
employees_df = spark.createDataFrame(employees_data, employees_columns)

# Create sample data for the 'departments' DataFrame
departments_data = [
    (101, "Finance"),
    (102, "Marketing"),
    (103, "IT")
]

# Create the 'departments' DataFrame
departments_columns = ["department_id", "department_name"]
departments_df = spark.createDataFrame(departments_data, departments_columns)

# Display both DataFrames to check the initial data
print("Employees DataFrame:")
employees_df.show()

print("Departments DataFrame:")
departments_df.show()

# Expected output for 'employees':
# +------------+-------+-------------+
# | employee_id|   name|department_id|
# +------------+-------+-------------+
# |          1|  Alice|          101|
# |          2|    Bob|          102|
# |          3|Charlie|          101|
# |          4|  David|          103|
# |          5|    Eve|          102|
# +------------+-------+-------------+

# Expected output for 'departments':
# +-------------+--------------+
# |department_id|department_name|
# +-------------+--------------+
# |          101|      Finance|
# |          102|     Marketing|
# |          103|            IT|
# +-------------+--------------+


Employees DataFrame:
+-----------+-------+-------------+
|employee_id|   name|department_id|
+-----------+-------+-------------+
|          1|  Alice|          101|
|          2|    Bob|          102|
|          3|Charlie|          101|
|          4|  David|          103|
|          5|    Eve|          102|
+-----------+-------+-------------+

Departments DataFrame:
+-------------+---------------+
|department_id|department_name|
+-------------+---------------+
|          101|        Finance|
|          102|      Marketing|
|          103|             IT|
+-------------+---------------+



In [19]:
# Join these DataFrames to get a new DataFrame with employee_id, name, and department_name.

new_df = employees_df.join(departments_df, on='department_id', how='inner')
new_df.show()

+-------------+-----------+-------+---------------+
|department_id|employee_id|   name|department_name|
+-------------+-----------+-------+---------------+
|          101|          1|  Alice|        Finance|
|          101|          3|Charlie|        Finance|
|          102|          2|    Bob|      Marketing|
|          102|          5|    Eve|      Marketing|
|          103|          4|  David|             IT|
+-------------+-----------+-------+---------------+



In [20]:
new_df.select('employee_id','name','department_name').show()

+-----------+-------+---------------+
|employee_id|   name|department_name|
+-----------+-------+---------------+
|          1|  Alice|        Finance|
|          3|Charlie|        Finance|
|          2|    Bob|      Marketing|
|          5|    Eve|      Marketing|
|          4|  David|             IT|
+-----------+-------+---------------+



In [21]:
# Filter the result to only include employees in the "Finance" department.

new_df.filter(col('department_name') == 'Finance').show()

+-------------+-----------+-------+---------------+
|department_id|employee_id|   name|department_name|
+-------------+-----------+-------+---------------+
|          101|          1|  Alice|        Finance|
|          101|          3|Charlie|        Finance|
+-------------+-----------+-------+---------------+



##### Creating a New Column


In [22]:
from pyspark.sql.functions import col, expr

# Create a Spark session
spark = SparkSession.builder.appName("DataFrameCreationExample").getOrCreate()

# Sample data for the DataFrame
orders_data = [
    (1, 101, 5, 20.0),  # order_id, product_id, quantity, price_per_unit
    (2, 102, 10, 15.0),
    (3, 103, 3, 50.0),
    (4, 104, 7, 30.0),
    (5, 105, 1, 100.0)
]

# Create the DataFrame with the given schema
orders_columns = ["order_id", "product_id", "quantity", "price_per_unit"]
orders_df = spark.createDataFrame(orders_data, orders_columns)

# Display the DataFrame to check the initial data
orders_df.show()

# Expected output:
# +--------+-----------+--------+--------------+
# |order_id|product_id |quantity|price_per_unit|
# +--------+-----------+--------+--------------+
# |       1|       101|       5|          20.0|
# |       2|       102|      10|          15.0|
# |       3|       103|       3|          50.0|
# |       4|       104|       7|          30.0|
# |       5|       105|       1|         100.0|
# +--------+-----------+--------+--------------+


+--------+----------+--------+--------------+
|order_id|product_id|quantity|price_per_unit|
+--------+----------+--------+--------------+
|       1|       101|       5|          20.0|
|       2|       102|      10|          15.0|
|       3|       103|       3|          50.0|
|       4|       104|       7|          30.0|
|       5|       105|       1|         100.0|
+--------+----------+--------+--------------+



In [23]:
# Add a new column total_price which is the product of quantity and price_per_unit.

orders_df.withColumn("total_price ", orders_df.quantity * orders_df.price_per_unit).show()


+--------+----------+--------+--------------+------------+
|order_id|product_id|quantity|price_per_unit|total_price |
+--------+----------+--------+--------------+------------+
|       1|       101|       5|          20.0|       100.0|
|       2|       102|      10|          15.0|       150.0|
|       3|       103|       3|          50.0|       150.0|
|       4|       104|       7|          30.0|       210.0|
|       5|       105|       1|         100.0|       100.0|
+--------+----------+--------+--------------+------------+



In [24]:
# Apply a discount of 10% if the total_price is greater than 100, and create a new column final_price with the discounted price.


from pyspark.sql.functions import col, when

# Create the 'total_price' column
orders_df = orders_df.withColumn("total_price",col("quantity") * col("price_per_unit"))

# Create the 'final_price' column with a conditional discount
orders_df = orders_df.withColumn(
    "final_price",
    when(col("total_price") > 100, col("total_price") * 0.90).otherwise(col("total_price"))
)

# Display the DataFrame to see the final result
orders_df.show()




+--------+----------+--------+--------------+-----------+-----------+
|order_id|product_id|quantity|price_per_unit|total_price|final_price|
+--------+----------+--------+--------------+-----------+-----------+
|       1|       101|       5|          20.0|      100.0|      100.0|
|       2|       102|      10|          15.0|      150.0|      135.0|
|       3|       103|       3|          50.0|      150.0|      135.0|
|       4|       104|       7|          30.0|      210.0|      189.0|
|       5|       105|       1|         100.0|      100.0|      100.0|
+--------+----------+--------+--------------+-----------+-----------+



##### Working with Dates


In [25]:
# Convert the event_date column to a date type.
# Add a new column year to represent the year of each event.
# Count the number of events for each year.

In [26]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date



# Sample data for the DataFrame
events_data = [
    (1, "Music Concert", "2022-05-14"),
    (2, "Art Exhibition", "2023-02-10"),
    (3, "Tech Conference", "2022-11-20"),
    (4, "Food Festival", "2023-07-23"),
    (5, "Film Premiere", "2022-09-05"),
]

# Create the DataFrame with the given schema
events_columns = ["event_id", "event_name", "event_date"]
events_df = spark.createDataFrame(events_data, events_columns)

# Convert the 'event_date' from string to date type
events_df = events_df.withColumn("event_date", to_date(events_df["event_date"], "yyyy-MM-dd"))

# Display the DataFrame to check the initial data
events_df.printSchema()
events_df.show()



root
 |-- event_id: long (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_date: date (nullable = true)

+--------+---------------+----------+
|event_id|     event_name|event_date|
+--------+---------------+----------+
|       1|  Music Concert|2022-05-14|
|       2| Art Exhibition|2023-02-10|
|       3|Tech Conference|2022-11-20|
|       4|  Food Festival|2023-07-23|
|       5|  Film Premiere|2022-09-05|
+--------+---------------+----------+



In [27]:
# Add a new column year to represent the year of each event.

from pyspark.sql import SparkSession, functions as F
events_df.withColumn("year", F.year(events_df.event_date)).show()


+--------+---------------+----------+----+
|event_id|     event_name|event_date|year|
+--------+---------------+----------+----+
|       1|  Music Concert|2022-05-14|2022|
|       2| Art Exhibition|2023-02-10|2023|
|       3|Tech Conference|2022-11-20|2022|
|       4|  Food Festival|2023-07-23|2023|
|       5|  Film Premiere|2022-09-05|2022|
+--------+---------------+----------+----+



In [28]:
events_df2 = events_df.withColumn("year", F.year(events_df.event_date))
events_df2.show()

+--------+---------------+----------+----+
|event_id|     event_name|event_date|year|
+--------+---------------+----------+----+
|       1|  Music Concert|2022-05-14|2022|
|       2| Art Exhibition|2023-02-10|2023|
|       3|Tech Conference|2022-11-20|2022|
|       4|  Food Festival|2023-07-23|2023|
|       5|  Film Premiere|2022-09-05|2022|
+--------+---------------+----------+----+



In [29]:
# Count the number of events for each year.
events_df2.groupBy('year').agg(count('event_name').alias('no_of_events')).show()


+----+------------+
|year|no_of_events|
+----+------------+
|2022|           3|
|2023|           2|
+----+------------+



### Advanced PySpark Operations

##### Window Functions

In [30]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType



# Define the schema for the DataFrame
schema = StructType([
    StructField("transaction_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("date", StringType(), True),
    StructField("amount", DoubleType(), True)
])

# Create sample data for the DataFrame
transactions_data = [
    (1, 101, "2022-01-01", 100.0),  # transaction_id, customer_id, date, amount
    (2, 101, "2022-01-05", 200.0),
    (3, 102, "2022-01-03", 150.0),
    (4, 102, "2022-01-10", 300.0),
    (5, 101, "2022-01-08", 150.0),
]

# Create the DataFrame with the specified schema
transactions_df = spark.createDataFrame(transactions_data, schema)

# Convert the 'date' column from string to date type
transactions_df = transactions_df.withColumn("date", to_date(transactions_df["date"], "yyyy-MM-dd"))

# Display the DataFrame to check the initial data
transactions_df.show()

# Expected output:
# +--------------+-----------+----------+------+
# |transaction_id|customer_id|      date|amount|
# +--------------+-----------+----------+------+
# |             1|        101|2022-01-01| 100.0|
# |             2|        101|2022-01-05| 200.0|
# |             3|        102|2022-01-03| 150.0|
# |             4|        102|2022-01-10| 300.0|
# |             5|        101|2022-01-08| 150.0|
# +--------------+-----------+----------+------+


+--------------+-----------+----------+------+
|transaction_id|customer_id|      date|amount|
+--------------+-----------+----------+------+
|             1|        101|2022-01-01| 100.0|
|             2|        101|2022-01-05| 200.0|
|             3|        102|2022-01-03| 150.0|
|             4|        102|2022-01-10| 300.0|
|             5|        101|2022-01-08| 150.0|
+--------------+-----------+----------+------+



In [31]:
# Add a new column total_amount_per_customer that contains the running total of amount for each customer_id, ordered by date.

In [32]:
from pyspark.sql.functions import sum
from pyspark.sql.window import Window

# Define the WindowSpec for partitioning by 'customer_id' and ordering by 'date'
window_spec = Window.partitionBy("customer_id").orderBy("date")

# Calculate the running total of 'amount' for each 'customer_id'
transactions_df2 = transactions_df.withColumn(
    "running_total",
    sum("amount").over(window_spec)
)

# Display the resulting DataFrame
transactions_df2.show()



+--------------+-----------+----------+------+-------------+
|transaction_id|customer_id|      date|amount|running_total|
+--------------+-----------+----------+------+-------------+
|             1|        101|2022-01-01| 100.0|        100.0|
|             2|        101|2022-01-05| 200.0|        300.0|
|             5|        101|2022-01-08| 150.0|        450.0|
|             3|        102|2022-01-03| 150.0|        150.0|
|             4|        102|2022-01-10| 300.0|        450.0|
+--------------+-----------+----------+------+-------------+



In [33]:
transactions_df.createOrReplaceTempView("my_query")
transactions_df.show()

+--------------+-----------+----------+------+
|transaction_id|customer_id|      date|amount|
+--------------+-----------+----------+------+
|             1|        101|2022-01-01| 100.0|
|             2|        101|2022-01-05| 200.0|
|             3|        102|2022-01-03| 150.0|
|             4|        102|2022-01-10| 300.0|
|             5|        101|2022-01-08| 150.0|
+--------------+-----------+----------+------+



In [34]:
result = spark.sql("select *,sum(amount) over(partition by customer_id order by date rows between unbounded preceding and current row ) as running_sum from my_query")


In [35]:
result.show()

+--------------+-----------+----------+------+-----------+
|transaction_id|customer_id|      date|amount|running_sum|
+--------------+-----------+----------+------+-----------+
|             1|        101|2022-01-01| 100.0|      100.0|
|             2|        101|2022-01-05| 200.0|      300.0|
|             5|        101|2022-01-08| 150.0|      450.0|
|             3|        102|2022-01-03| 150.0|      150.0|
|             4|        102|2022-01-10| 300.0|      450.0|
+--------------+-----------+----------+------+-----------+



In [36]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode

# Sample data for the DataFrame
users_data = [
    (1, "Alice", ["running", "swimming", "cycling"]),
    (2, "Bob", ["hiking", "climbing"]),
    (3, "Charlie", ["running", "gym", "yoga"]),
    (4, "David", ["swimming"]),
    (5, "Eve", ["yoga", "meditation", "pilates"])
]

# Create a DataFrame with an array-type column
users_columns = ["user_id", "name", "activities"]
users_df = spark.createDataFrame(users_data, users_columns)

# Display the DataFrame to check the initial data
users_df.show(truncate=False)

# Expected output:
# +-------+-------+--------------------------+
# |user_id|name   |activities                |
# +-------+-------+--------------------------+
# |1      |Alice  |[running, swimming, cycling] |
# |2      |Bob    |[hiking, climbing]        |
# |3      |Charlie|[running, gym, yoga]      |
# |4      |David  |[swimming]                |
# |5      |Eve    |[yoga, meditation, pilates]|
# +-------+-------+--------------------------+


+-------+-------+----------------------------+
|user_id|name   |activities                  |
+-------+-------+----------------------------+
|1      |Alice  |[running, swimming, cycling]|
|2      |Bob    |[hiking, climbing]          |
|3      |Charlie|[running, gym, yoga]        |
|4      |David  |[swimming]                  |
|5      |Eve    |[yoga, meditation, pilates] |
+-------+-------+----------------------------+



In [37]:
users_df.withColumn("activity",explode("activities")).show()

+-------+-------+--------------------+----------+
|user_id|   name|          activities|  activity|
+-------+-------+--------------------+----------+
|      1|  Alice|[running, swimmin...|   running|
|      1|  Alice|[running, swimmin...|  swimming|
|      1|  Alice|[running, swimmin...|   cycling|
|      2|    Bob|  [hiking, climbing]|    hiking|
|      2|    Bob|  [hiking, climbing]|  climbing|
|      3|Charlie|[running, gym, yoga]|   running|
|      3|Charlie|[running, gym, yoga]|       gym|
|      3|Charlie|[running, gym, yoga]|      yoga|
|      4|  David|          [swimming]|  swimming|
|      5|    Eve|[yoga, meditation...|      yoga|
|      5|    Eve|[yoga, meditation...|meditation|
|      5|    Eve|[yoga, meditation...|   pilates|
+-------+-------+--------------------+----------+



In [38]:
exploded_df = users_df.withColumn("activity", explode("activities")).drop("activities")


In [39]:
exploded_df.show()

+-------+-------+----------+
|user_id|   name|  activity|
+-------+-------+----------+
|      1|  Alice|   running|
|      1|  Alice|  swimming|
|      1|  Alice|   cycling|
|      2|    Bob|    hiking|
|      2|    Bob|  climbing|
|      3|Charlie|   running|
|      3|Charlie|       gym|
|      3|Charlie|      yoga|
|      4|  David|  swimming|
|      5|    Eve|      yoga|
|      5|    Eve|meditation|
|      5|    Eve|   pilates|
+-------+-------+----------+



#### Pivioting

In [40]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col



# Sample data for the DataFrame
scores_data = [
    (1, "Math", 95),  # student_id, subject, score
    (1, "Science", 88),
    (1, "English", 92),
    (2, "Math", 78),
    (2, "Science", 85),
    (2, "English", 81),
    (3, "Math", 87),
    (3, "Science", 92),
    (3, "English", 89)
]

# Create the DataFrame with the given schema
scores_columns = ["student_id", "subject", "score"]
scores_df = spark.createDataFrame(scores_data, scores_columns)

# Display the DataFrame to check the initial data
scores_df.show()



+----------+-------+-----+
|student_id|subject|score|
+----------+-------+-----+
|         1|   Math|   95|
|         1|Science|   88|
|         1|English|   92|
|         2|   Math|   78|
|         2|Science|   85|
|         2|English|   81|
|         3|   Math|   87|
|         3|Science|   92|
|         3|English|   89|
+----------+-------+-----+



In [41]:
scores_df.groupBy("student_id").pivot("subject").agg(avg("score")).show()

+----------+-------+----+-------+
|student_id|English|Math|Science|
+----------+-------+----+-------+
|         1|   92.0|95.0|   88.0|
|         3|   89.0|87.0|   92.0|
|         2|   81.0|78.0|   85.0|
+----------+-------+----+-------+



In [42]:
# Create the first DataFrame (df1)
df1_data = [
    (1, "Alice", "Finance"),
    (2, "Bob", "Marketing"),
    (3, "Charlie", "IT"),
    (4, "David", "Finance"),
    (5, "Eve", "HR")
]

df1_columns = ["employee_id", "name", "department"]
df1 = spark.createDataFrame(df1_data, df1_columns)

# Create the second DataFrame (df2)
df2_data = [
    (3, "Charlie", "IT"),  # This row is a duplicate
    (6, "Frank", "Marketing"),
    (7, "Grace", "Finance"),
    (8, "Hannah", "HR"),
    (9, "Ian", "IT")
]

df2_columns = ["employee_id", "name", "department"]
df2 = spark.createDataFrame(df2_data, df2_columns)

# Display both DataFrames to check the initial data
print("DataFrame 1:")
df1.show()

print("DataFrame 2:")
df2.show()

# Expected output for 'df1':
# +------------+-------+-----------+
# |employee_id | name  | department|
# +------------+-------+-----------+
# |          1| Alice |   Finance |
# |          2|  Bob  |  Marketing|
# |          3|Charlie|        IT |
# |          4| David |   Finance |
# |          5|  Eve  |        HR |
# +------------+-------+-----------+

# Expected output for 'df2':
# +------------+-------+-----------+
# |employee_id | name  | department|
# +------------+-------+-----------+
# |          3|Charlie|        IT  |
# |          6| Frank |  Marketing |
# |          7| Grace |   Finance  |
# |          8| Hannah|        HR  |
# |          9| Ian   |       IT   |
# +------------+-------+-----------+


DataFrame 1:
+-----------+-------+----------+
|employee_id|   name|department|
+-----------+-------+----------+
|          1|  Alice|   Finance|
|          2|    Bob| Marketing|
|          3|Charlie|        IT|
|          4|  David|   Finance|
|          5|    Eve|        HR|
+-----------+-------+----------+

DataFrame 2:
+-----------+-------+----------+
|employee_id|   name|department|
+-----------+-------+----------+
|          3|Charlie|        IT|
|          6|  Frank| Marketing|
|          7|  Grace|   Finance|
|          8| Hannah|        HR|
|          9|    Ian|        IT|
+-----------+-------+----------+



In [43]:
# Union the DataFrames.
# Drop any duplicate rows based on a unique identifier.

In [44]:
union_df = df1.union(df2)
union_df.show()

+-----------+-------+----------+
|employee_id|   name|department|
+-----------+-------+----------+
|          1|  Alice|   Finance|
|          2|    Bob| Marketing|
|          3|Charlie|        IT|
|          4|  David|   Finance|
|          5|    Eve|        HR|
|          3|Charlie|        IT|
|          6|  Frank| Marketing|
|          7|  Grace|   Finance|
|          8| Hannah|        HR|
|          9|    Ian|        IT|
+-----------+-------+----------+



In [45]:
union_df.drop_duplicates().show()

+-----------+-------+----------+
|employee_id|   name|department|
+-----------+-------+----------+
|          1|  Alice|   Finance|
|          2|    Bob| Marketing|
|          3|Charlie|        IT|
|          5|    Eve|        HR|
|          4|  David|   Finance|
|          6|  Frank| Marketing|
|          7|  Grace|   Finance|
|          8| Hannah|        HR|
|          9|    Ian|        IT|
+-----------+-------+----------+



#### Udfs

In [46]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Define the schema for the DataFrame
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("gender", StringType(), True)
])

# Sample data for the DataFrame
people_data = [
    ("Alice", 25, "F"),
    ("Bob", 17, "M"),
    ("Charlie", 30, "M"),
    ("David", 15, "M"),
    ("Eve", 22, "F")
]

# Create the DataFrame with the specified schema
people_df = spark.createDataFrame(people_data, schema)

# Display the DataFrame to check the initial data
people_df.show()

# Expected output:
# +-------+---+------+
# |  name |age|gender|
# +-------+---+------+
# | Alice | 25|   F  |
# |  Bob  | 17|   M  |
# |Charlie| 30|   M  |
# | David | 15|   M  |
# |  Eve  | 22|   F  |
# +-------+---+------+


+-------+---+------+
|   name|age|gender|
+-------+---+------+
|  Alice| 25|     F|
|    Bob| 17|     M|
|Charlie| 30|     M|
|  David| 15|     M|
|    Eve| 22|     F|
+-------+---+------+



In [47]:
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType



# Define a UDF to determine if a person is a minor
def is_minor(age):
    return age < 18

# Register the UDF with the appropriate return type
is_minor_udf = udf(is_minor, BooleanType())

# Add a new column 'is_minor' using the UDF
people_df = people_df.withColumn("is_minor", is_minor_udf("age"))

# Show the DataFrame with the new column
people_df.show()



+-------+---+------+--------+
|   name|age|gender|is_minor|
+-------+---+------+--------+
|  Alice| 25|     F|   false|
|    Bob| 17|     M|    true|
|Charlie| 30|     M|   false|
|  David| 15|     M|    true|
|    Eve| 22|     F|   false|
+-------+---+------+--------+



In [48]:

from pyspark.sql.functions import to_date



# Employees salary information
data_salary = [
    (100, "Raj", None, 1, "01-04-23", 50000),  # Employee ID, Name, Manager ID, Dept ID, Salary Date, Salary
    (200, "Joanne", 100, 1, "01-04-23", 4000),
    (200, "Joanne", 100, 1, "13-04-23", 4500),
    (200, "Joanne", 100, 1, "14-04-23", 4020),
]

# Schema for the salary DataFrame
schema_salary = ["EmpId", "EmpName", "Mgrid", "deptid", "salarydt", "salary"]

# Create the salary DataFrame
df_salary = spark.createDataFrame(data_salary, schema_salary)

# Convert 'salarydt' from string to date type
df_salary = df_salary.withColumn("salarydt", to_date(df_salary["salarydt"], "dd-MM-yy"))

# Department information
data_dept = [(1, "IT"), (2, "HR")]
schema_dept = ["deptid", "deptname"]

# Create the department DataFrame
df_dept = spark.createDataFrame(data_dept, schema_dept)

# Display the DataFrames
print("Salary DataFrame:")
df_salary.show()

print("Department DataFrame:")
df_dept.show()



Salary DataFrame:
+-----+-------+-----+------+----------+------+
|EmpId|EmpName|Mgrid|deptid|  salarydt|salary|
+-----+-------+-----+------+----------+------+
|  100|    Raj| null|     1|2023-04-01| 50000|
|  200| Joanne|  100|     1|2023-04-01|  4000|
|  200| Joanne|  100|     1|2023-04-13|  4500|
|  200| Joanne|  100|     1|2023-04-14|  4020|
+-----+-------+-----+------+----------+------+

Department DataFrame:
+------+--------+
|deptid|deptname|
+------+--------+
|     1|      IT|
|     2|      HR|
+------+--------+



In [49]:
df6 = df_salary.join(df_dept, on = 'deptid', how = 'inner')
df6.show()
df6.printSchema()

+------+-----+-------+-----+----------+------+--------+
|deptid|EmpId|EmpName|Mgrid|  salarydt|salary|deptname|
+------+-----+-------+-----+----------+------+--------+
|     1|  100|    Raj| null|2023-04-01| 50000|      IT|
|     1|  200| Joanne|  100|2023-04-01|  4000|      IT|
|     1|  200| Joanne|  100|2023-04-13|  4500|      IT|
|     1|  200| Joanne|  100|2023-04-14|  4020|      IT|
+------+-----+-------+-----+----------+------+--------+

root
 |-- deptid: long (nullable = true)
 |-- EmpId: long (nullable = true)
 |-- EmpName: string (nullable = true)
 |-- Mgrid: long (nullable = true)
 |-- salarydt: date (nullable = true)
 |-- salary: long (nullable = true)
 |-- deptname: string (nullable = true)



In [50]:
df5 = df_salary.join(df_dept, ['deptid'])
df5.show()

+------+-----+-------+-----+----------+------+--------+
|deptid|EmpId|EmpName|Mgrid|  salarydt|salary|deptname|
+------+-----+-------+-----+----------+------+--------+
|     1|  100|    Raj| null|2023-04-01| 50000|      IT|
|     1|  200| Joanne|  100|2023-04-01|  4000|      IT|
|     1|  200| Joanne|  100|2023-04-13|  4500|      IT|
|     1|  200| Joanne|  100|2023-04-14|  4020|      IT|
+------+-----+-------+-----+----------+------+--------+



In [51]:
df4 = df5.alias('a').join(df5.alias('b'), col('a.Mgrid')==col('b.Mgrid'), how = 'left').select(
    col('a.deptname'),
    col('b.EmpName').alias('Manager_Name'),
    col('a.EmpName'),
    col('a.salarydt'),
    col('a.salary')
)

In [52]:
df4.show()

+--------+------------+-------+----------+------+
|deptname|Manager_Name|EmpName|  salarydt|salary|
+--------+------------+-------+----------+------+
|      IT|        null|    Raj|2023-04-01| 50000|
|      IT|      Joanne| Joanne|2023-04-01|  4000|
|      IT|      Joanne| Joanne|2023-04-01|  4000|
|      IT|      Joanne| Joanne|2023-04-01|  4000|
|      IT|      Joanne| Joanne|2023-04-13|  4500|
|      IT|      Joanne| Joanne|2023-04-13|  4500|
|      IT|      Joanne| Joanne|2023-04-13|  4500|
|      IT|      Joanne| Joanne|2023-04-14|  4020|
|      IT|      Joanne| Joanne|2023-04-14|  4020|
|      IT|      Joanne| Joanne|2023-04-14|  4020|
+--------+------------+-------+----------+------+



In [53]:
from pyspark.sql.functions import year, month

In [54]:
df3 = df4.groupby('deptname','Manager_Name','EmpName',year('salarydt').alias('year'),date_format('salarydt','MMM').alias('month')).sum('salary')

In [55]:
df3.show()

+--------+------------+-------+----+-----+-----------+
|deptname|Manager_Name|EmpName|year|month|sum(salary)|
+--------+------------+-------+----+-----+-----------+
|      IT|        null|    Raj|2023|  Apr|      50000|
|      IT|      Joanne| Joanne|2023|  Apr|      37560|
+--------+------------+-------+----+-----+-----------+



In [56]:
from pyspark.sql import functions as F

In [57]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F


# Sample DataFrame with given data
data = [
    (1, 100, "Raj", None, "2023-04-01", 50000, "IT"),
    (1, 200, "Joanne", 100, "2023-04-01", 4000, "IT"),
    (1, 200, "Joanne", 100, "2023-04-13", 4500, "IT"),
    (1, 200, "Joanne", 100, "2023-04-14", 4020, "IT"),
]

schema = ["deptid", "EmpId", "EmpName", "Mgrid", "salarydt", "salary", "deptname"]

df = spark.createDataFrame(data, schema)

# Alias the DataFrame for self left join
left_df = df.alias("left_df")
right_df = df.alias("right_df")

# Perform self left join on 'Mgrid' matching 'EmpId'
joined_df = left_df.join(
    right_df,
    F.col("left_df.Mgrid") == F.col("right_df.EmpId"),
    how="left"
).select(
    F.col("left_df.deptid"),
    F.col("left_df.EmpName"),
    F.col("right_df.EmpName").alias("Manager_Name"),
    F.col("left_df.salarydt"),
    F.col("left_df.salary"),
    F.col("left_df.deptname")
)

# Show the joined DataFrame
joined_df.show()

# Expected output:
# +-------+-------+-------------+----------+-------+--------+
# | deptid|EmpName|Manager_Name | salarydt | salary|deptname|
# +-------+-------+-------------+----------+-------+--------+
# |     1 |  Raj  |     null    |2023-04-01| 50000 |     IT |
# |     1 | Joanne|     Raj     |2023-04-01|  4000 |     IT |
# |     1 | Joanne|     Raj     |2023-04-13|  4500 |     IT |
# |     1 | Joanne|     Raj     |2023-04-14|  4020 |     IT |
# +-------+-------+-------------+----------+-------+--------+


+------+-------+------------+----------+------+--------+
|deptid|EmpName|Manager_Name|  salarydt|salary|deptname|
+------+-------+------------+----------+------+--------+
|     1|    Raj|        null|2023-04-01| 50000|      IT|
|     1| Joanne|         Raj|2023-04-01|  4000|      IT|
|     1| Joanne|         Raj|2023-04-13|  4500|      IT|
|     1| Joanne|         Raj|2023-04-14|  4020|      IT|
+------+-------+------------+----------+------+--------+



### Pyspark Window functions

##### row_number()

In [88]:
simpleData = (("James", "Sales", 3000), \
    ("Michael", "Sales", 4600),  \
    ("Robert", "Sales", 4100),   \
    ("Maria", "Finance", 3000),  \
    ("James", "Sales", 3000),    \
    ("Scott", "Finance", 3300),  \
    ("Jen", "Finance", 3900),    \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", "Marketing", 2000),\
    ("Saif", "Sales", 4100) \
  )
 
columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+



In [76]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, desc

window_spec = Window.partitionBy("department").orderBy("salary")

df.withColumn("row_number", row_number().over(window_spec)).show()

+-------------+----------+------+----------+
|employee_name|department|salary|row_number|
+-------------+----------+------+----------+
|        Maria|   Finance|  3000|         1|
|        Scott|   Finance|  3300|         2|
|          Jen|   Finance|  3900|         3|
|        Kumar| Marketing|  2000|         1|
|         Jeff| Marketing|  3000|         2|
|        James|     Sales|  3000|         1|
|        James|     Sales|  3000|         2|
|       Robert|     Sales|  4100|         3|
|         Saif|     Sales|  4100|         4|
|      Michael|     Sales|  4600|         5|
+-------------+----------+------+----------+



In [77]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, desc

window_spec = Window.partitionBy("department").orderBy(desc("salary"))

df = df.withColumn("row_number", row_number().over(window_spec))
df.show()


+-------------+----------+------+----------+
|employee_name|department|salary|row_number|
+-------------+----------+------+----------+
|          Jen|   Finance|  3900|         1|
|        Scott|   Finance|  3300|         2|
|        Maria|   Finance|  3000|         3|
|         Jeff| Marketing|  3000|         1|
|        Kumar| Marketing|  2000|         2|
|      Michael|     Sales|  4600|         1|
|       Robert|     Sales|  4100|         2|
|         Saif|     Sales|  4100|         3|
|        James|     Sales|  3000|         4|
|        James|     Sales|  3000|         5|
+-------------+----------+------+----------+



#### rank() Functions

In [78]:
from pyspark.sql.functions import rank

from pyspark.sql.window import Window

window_spec = Window.partitionBy("department").orderBy("salary")



df = df.withColumn("rank", rank().over(window_spec))
df.show

##### Dense_Rank()

In [81]:
from pyspark.sql.functions import dense_rank
df.withColumn("dense_rank", dense_rank().over(Window.partitionBy("department").orderBy("salary"))).show()

+-------------+----------+------+----------+----+----------+
|employee_name|department|salary|row_number|rank|dense_rank|
+-------------+----------+------+----------+----+----------+
|        Maria|   Finance|  3000|         3|   1|         1|
|        Scott|   Finance|  3300|         2|   2|         2|
|          Jen|   Finance|  3900|         1|   3|         3|
|        Kumar| Marketing|  2000|         2|   1|         1|
|         Jeff| Marketing|  3000|         1|   2|         2|
|        James|     Sales|  3000|         4|   1|         1|
|        James|     Sales|  3000|         5|   1|         1|
|       Robert|     Sales|  4100|         2|   3|         2|
|         Saif|     Sales|  4100|         3|   3|         2|
|      Michael|     Sales|  4600|         1|   5|         3|
+-------------+----------+------+----------+----+----------+



##### percent_rank Window Function

In [89]:
from pyspark.sql.functions import percent_rank

windowSpec = Window.partitionBy("department").orderBy("salary")
df_percent_rank = df.withColumn("percent_rank",percent_rank().over(windowSpec))
df_percent_rank.show()

+-------------+----------+------+------------+
|employee_name|department|salary|percent_rank|
+-------------+----------+------+------------+
|        Maria|   Finance|  3000|         0.0|
|        Scott|   Finance|  3300|         0.5|
|          Jen|   Finance|  3900|         1.0|
|        Kumar| Marketing|  2000|         0.0|
|         Jeff| Marketing|  3000|         1.0|
|        James|     Sales|  3000|         0.0|
|        James|     Sales|  3000|         0.0|
|       Robert|     Sales|  4100|         0.5|
|         Saif|     Sales|  4100|         0.5|
|      Michael|     Sales|  4600|         1.0|
+-------------+----------+------+------------+



##### ntile Window Function

In [92]:
from pyspark.sql.functions import ntile

df_ntile = df.withColumn("ntile", ntile(2).over(Window.partitionBy("department").orderBy("salary")))
df_ntile.show()

+-------------+----------+------+-----+
|employee_name|department|salary|ntile|
+-------------+----------+------+-----+
|        Maria|   Finance|  3000|    1|
|        Scott|   Finance|  3300|    1|
|          Jen|   Finance|  3900|    2|
|        Kumar| Marketing|  2000|    1|
|         Jeff| Marketing|  3000|    2|
|        James|     Sales|  3000|    1|
|        James|     Sales|  3000|    1|
|       Robert|     Sales|  4100|    1|
|         Saif|     Sales|  4100|    2|
|      Michael|     Sales|  4600|    2|
+-------------+----------+------+-----+



In [93]:
import pyspark
from pyspark.sql import SparkSession


simpleData = (("James", "Sales", 3000), \
    ("Michael", "Sales", 4600),  \
    ("Robert", "Sales", 4100),   \
    ("Maria", "Finance", 3000),  \
    ("James", "Sales", 3000),    \
    ("Scott", "Finance", 3300),  \
    ("Jen", "Finance", 3900),    \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", "Marketing", 2000),\
    ("Saif", "Sales", 4100) \
  )
 
columns= ["employee_name", "department", "salary"]

df = spark.createDataFrame(data = simpleData, schema = columns)

df.printSchema()
df.show(truncate=False)

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec  = Window.partitionBy("department").orderBy("salary")

df.withColumn("row_number",row_number().over(windowSpec)) \
    .show(truncate=False)

from pyspark.sql.functions import rank
df.withColumn("rank",rank().over(windowSpec)) \
    .show()

from pyspark.sql.functions import dense_rank
df.withColumn("dense_rank",dense_rank().over(windowSpec)) \
    .show()

from pyspark.sql.functions import percent_rank
df.withColumn("percent_rank",percent_rank().over(windowSpec)) \
    .show()
    
from pyspark.sql.functions import ntile
df.withColumn("ntile",ntile(2).over(windowSpec)) \
    .show()

from pyspark.sql.functions import cume_dist    
df.withColumn("cume_dist",cume_dist().over(windowSpec)) \
   .show()
 
from pyspark.sql.functions import lag    
df.withColumn("lag",lag("salary",2).over(windowSpec)) \
      .show()

from pyspark.sql.functions import lead    
df.withColumn("lead",lead("salary",2).over(windowSpec)) \
    .show()
    
windowSpecAgg  = Window.partitionBy("department")
from pyspark.sql.functions import col,avg,sum,min,max,row_number 
df.withColumn("row",row_number().over(windowSpec)) \
  .withColumn("avg", avg(col("salary")).over(windowSpecAgg)) \
  .withColumn("sum", sum(col("salary")).over(windowSpecAgg)) \
  .withColumn("min", min(col("salary")).over(windowSpecAgg)) \
  .withColumn("max", max(col("salary")).over(windowSpecAgg)) \
  .where(col("row")==1).select("department","avg","sum","min","max") \
  .show()

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+

+-------------+----------+------+----------+
|employee_name|department|salary|row_number|
+-------------+----------+------+----------+
|Maria        |Finance   |3000  |1         |
|Scott        |Finance   |3300  |2         |
|Jen          |Finance   |3900  |3         |
|Kumar        |Marketing |2000  |1         |
|Jeff         |Marketing |3000  |2         |
|James        |Sales     |3000  |1

#### Pysaprk Union functions

In [94]:
# Imports
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

simpleData = [("James","Sales","NY",90000,34,10000), \
    ("Michael","Sales","NY",86000,56,20000), \
    ("Robert","Sales","CA",81000,30,23000), \
    ("Maria","Finance","CA",90000,24,23000) \
  ]

columns= ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
+-------------+----------+-----+------+---+-----+



In [95]:
# Create DataFrame2
simpleData2 = [("James","Sales","NY",90000,34,10000), \
    ("Maria","Finance","CA",90000,24,23000), \
    ("Jen","Finance","NY",79000,53,15000), \
    ("Jeff","Marketing","CA",80000,25,18000), \
    ("Kumar","Marketing","NY",91000,50,21000) \
  ]
columns2= ["employee_name","department","state","salary","age","bonus"]

df2 = spark.createDataFrame(data = simpleData2, schema = columns2)

df2.printSchema()
df2.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+



In [101]:
df_union = df.union(df2)
df_union.show()   ## it will contain only unique records
df_union.count()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        James|     Sales|   NY| 90000| 34|10000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+



9

In [102]:
df_union_all = df.unionAll(df2)
df_union_all.show()  ## It will contain duplicates records also
df_union_all.count()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        James|     Sales|   NY| 90000| 34|10000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+



9

In [103]:
## Merge without Duplicates

# Remove duplicates after union() using distinct()
disDF = df.union(df2).distinct()
disDF.show(truncate=False)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
+-------------+----------+-----+------+---+-----+



In [104]:
disDF.createOrReplaceTempView("my_query")

In [105]:
spark.sql("select * from my_query").show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
+-------------+----------+-----+------+---+-----+



###### Employees who earn more than their managers

In [49]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [50]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, DateType
from datetime import datetime



# Convert date strings to datetime.date objects
def str_to_date(date_str):
    return datetime.strptime(date_str, "%Y-%m-%d").date()

# Define the data with date conversion
data = [
    {'emp_id': 68319, 'emp_name': 'KAYLING', 'job_name': 'PRESIDENT', 'manager_id': None, 'hire_date': str_to_date("1991-11-18"), 'salary': 6000.00, 'commission': None, 'dep_id': 1001},
    {'emp_id': 66928, 'emp_name': 'BLAZE', 'job_name': 'MANAGER', 'manager_id': 68319, 'hire_date': str_to_date("1991-05-01"), 'salary': 2750.00, 'commission': None, 'dep_id': 3001},
    {'emp_id': 67832, 'emp_name': 'CLARE', 'job_name': 'MANAGER', 'manager_id': 68319, 'hire_date': str_to_date("1991-06-09"), 'salary': 2550.00, 'commission': None, 'dep_id': 1001},
    {'emp_id': 65646, 'emp_name': 'JONAS', 'job_name': 'MANAGER', 'manager_id': 68319, 'hire_date': str_to_date("1991-04-02"), 'salary': 2957.00, 'commission': None, 'dep_id': 2001},
    {'emp_id': 67858, 'emp_name': 'SCARLET', 'job_name': 'ANALYST', 'manager_id': 65646, 'hire_date': str_to_date("1997-04-19"), 'salary': 3100.00, 'commission': None, 'dep_id': 2001},
    {'emp_id': 69062, 'emp_name': 'FRANK', 'job_name': 'ANALYST', 'manager_id': 65646, 'hire_date': str_to_date("1991-12-03"), 'salary': 3100.00, 'commission': None, 'dep_id': 2001},
    {'emp_id': 63679, 'emp_name': 'SANDRINE', 'job_name': 'CLERK', 'manager_id': 69062, 'hire_date': str_to_date("1990-12-18"), 'salary': 900.00, 'commission': None, 'dep_id': 2001},
    {'emp_id': 64989, 'emp_name': 'ADELYN', 'job_name': 'SALESMAN', 'manager_id': 66928, 'hire_date': str_to_date("1991-02-20"), 'salary': 1700.00, 'commission': 400.00, 'dep_id': 3001},
    {'emp_id': 65271, 'emp_name': 'WADE', 'job_name': 'SALESMAN', 'manager_id': 66928, 'hire_date': str_to_date("1991-02-22"), 'salary': 1350.00, 'commission': 600.00, 'dep_id': 3001},
    {'emp_id': 66564, 'emp_name': 'MADDEN', 'job_name': 'SALESMAN', 'manager_id': 66928, 'hire_date': str_to_date("1991-09-28"), 'salary': 1350.00, 'commission': 1500.00, 'dep_id': 3001},
    {'emp_id': 68454, 'emp_name': 'TUCKER', 'job_name': 'SALESMAN', 'manager_id': 66928, 'hire_date': str_to_date("1991-09-08"), 'salary': 1600.00, 'commission': 0.00, 'dep_id': 3001},
    {'emp_id': 68736, 'emp_name': 'ADNRES', 'job_name': 'CLERK', 'manager_id': 67858, 'hire_date': str_to_date("1997-05-23"), 'salary': 1200.00, 'commission': None, 'dep_id': 2001},
    {'emp_id': 69000, 'emp_name': 'JULIUS', 'job_name': 'CLERK', 'manager_id': 66928, 'hire_date': str_to_date("1991-12-03"), 'salary': 1050.00, 'commission': None, 'dep_id': 3001},
    {'emp_id': 69324, 'emp_name': 'MARKER', 'job_name': 'CLERK', 'manager_id': 67832, 'hire_date': str_to_date("1992-01-23"), 'salary': 1400.00, 'commission': None, 'dep_id': 1001}
]

# Define the schema
schema = StructType([
    StructField("emp_id", IntegerType(), True),
    StructField("emp_name", StringType(), True),
    StructField("job_name", StringType(), True),
    StructField("manager_id", IntegerType(), True),
    StructField("hire_date", DateType(), True),
    StructField("salary", FloatType(), True),
    StructField("commission", FloatType(), True),
    StructField("dep_id", IntegerType(), True)
])

# Create the DataFrame
df_employee = spark.createDataFrame(data, schema)

df_employee.printSchema()
df_employee.show()


root
 |-- emp_id: integer (nullable = true)
 |-- emp_name: string (nullable = true)
 |-- job_name: string (nullable = true)
 |-- manager_id: integer (nullable = true)
 |-- hire_date: date (nullable = true)
 |-- salary: float (nullable = true)
 |-- commission: float (nullable = true)
 |-- dep_id: integer (nullable = true)

+------+--------+---------+----------+----------+------+----------+------+
|emp_id|emp_name| job_name|manager_id| hire_date|salary|commission|dep_id|
+------+--------+---------+----------+----------+------+----------+------+
| 68319| KAYLING|PRESIDENT|      null|1991-11-18|6000.0|      null|  1001|
| 66928|   BLAZE|  MANAGER|     68319|1991-05-01|2750.0|      null|  3001|
| 67832|   CLARE|  MANAGER|     68319|1991-06-09|2550.0|      null|  1001|
| 65646|   JONAS|  MANAGER|     68319|1991-04-02|2957.0|      null|  2001|
| 67858| SCARLET|  ANALYST|     65646|1997-04-19|3100.0|      null|  2001|
| 69062|   FRANK|  ANALYST|     65646|1991-12-03|3100.0|      null|  2001|
|

In [51]:
df_employee.createOrReplaceTempView("my_table")

In [52]:
spark.sql("select * from my_table").show()

+------+--------+---------+----------+----------+------+----------+------+
|emp_id|emp_name| job_name|manager_id| hire_date|salary|commission|dep_id|
+------+--------+---------+----------+----------+------+----------+------+
| 68319| KAYLING|PRESIDENT|      null|1991-11-18|6000.0|      null|  1001|
| 66928|   BLAZE|  MANAGER|     68319|1991-05-01|2750.0|      null|  3001|
| 67832|   CLARE|  MANAGER|     68319|1991-06-09|2550.0|      null|  1001|
| 65646|   JONAS|  MANAGER|     68319|1991-04-02|2957.0|      null|  2001|
| 67858| SCARLET|  ANALYST|     65646|1997-04-19|3100.0|      null|  2001|
| 69062|   FRANK|  ANALYST|     65646|1991-12-03|3100.0|      null|  2001|
| 63679|SANDRINE|    CLERK|     69062|1990-12-18| 900.0|      null|  2001|
| 64989|  ADELYN| SALESMAN|     66928|1991-02-20|1700.0|     400.0|  3001|
| 65271|    WADE| SALESMAN|     66928|1991-02-22|1350.0|     600.0|  3001|
| 66564|  MADDEN| SALESMAN|     66928|1991-09-28|1350.0|    1500.0|  3001|
| 68454|  TUCKER| SALESMA

In [53]:
spark.sql("""select t1.emp_name as employee_name, t2.emp_name as manager_name, t1.salary as employee_salary, t2.salary as manager_salary from my_table t1
inner join my_table t2 on t1.manager_id = t2.emp_id
where t1.salary>t2.salary""").show()

+-------------+------------+---------------+--------------+
|employee_name|manager_name|employee_salary|manager_salary|
+-------------+------------+---------------+--------------+
|      SCARLET|       JONAS|         3100.0|        2957.0|
|        FRANK|       JONAS|         3100.0|        2957.0|
+-------------+------------+---------------+--------------+



In [54]:
df_employee_2 = df_employee
df_employee_2.show()

+------+--------+---------+----------+----------+------+----------+------+
|emp_id|emp_name| job_name|manager_id| hire_date|salary|commission|dep_id|
+------+--------+---------+----------+----------+------+----------+------+
| 68319| KAYLING|PRESIDENT|      null|1991-11-18|6000.0|      null|  1001|
| 66928|   BLAZE|  MANAGER|     68319|1991-05-01|2750.0|      null|  3001|
| 67832|   CLARE|  MANAGER|     68319|1991-06-09|2550.0|      null|  1001|
| 65646|   JONAS|  MANAGER|     68319|1991-04-02|2957.0|      null|  2001|
| 67858| SCARLET|  ANALYST|     65646|1997-04-19|3100.0|      null|  2001|
| 69062|   FRANK|  ANALYST|     65646|1991-12-03|3100.0|      null|  2001|
| 63679|SANDRINE|    CLERK|     69062|1990-12-18| 900.0|      null|  2001|
| 64989|  ADELYN| SALESMAN|     66928|1991-02-20|1700.0|     400.0|  3001|
| 65271|    WADE| SALESMAN|     66928|1991-02-22|1350.0|     600.0|  3001|
| 66564|  MADDEN| SALESMAN|     66928|1991-09-28|1350.0|    1500.0|  3001|
| 68454|  TUCKER| SALESMA

In [8]:
df=spark.createDataFrame(
        data = [ ("1","2019-06-24 12:01:19.000")],
        schema=["id","input_timestamp"])
df.printSchema()
df.show(truncate=False)

root
 |-- id: string (nullable = true)
 |-- input_timestamp: string (nullable = true)

+---+-----------------------+
|id |input_timestamp        |
+---+-----------------------+
|1  |2019-06-24 12:01:19.000|
+---+-----------------------+



In [9]:
from pyspark.sql.functions import *

In [10]:
df.withColumn("date_type", to_date("input_timestamp")).show(truncate=False)

+---+-----------------------+----------+
|id |input_timestamp        |date_type |
+---+-----------------------+----------+
|1  |2019-06-24 12:01:19.000|2019-06-24|
+---+-----------------------+----------+



In [11]:
df.withColumn("date_type", to_date(current_timestamp())).show(truncate=False)

+---+-----------------------+----------+
|id |input_timestamp        |date_type |
+---+-----------------------+----------+
|1  |2019-06-24 12:01:19.000|2024-05-10|
+---+-----------------------+----------+



In [13]:
#Custom Timestamp format to DateType
df.select(to_date(lit('05-10-2024 12:01:19.000'),'MM-dd-yyyy HH:mm:ss.SSSS')) \
  .show()

+----------------------------------------------------------+
|to_date(05-10-2024 12:01:19.000, MM-dd-yyyy HH:mm:ss.SSSS)|
+----------------------------------------------------------+
|                                                2024-05-10|
+----------------------------------------------------------+



In [14]:
#Timestamp type to DateType
df.withColumn("ts",to_timestamp(col("input_timestamp"))) \
  .withColumn("datetype",to_date(col("ts"))) \
  .show(truncate=False)

+---+-----------------------+-------------------+----------+
|id |input_timestamp        |ts                 |datetype  |
+---+-----------------------+-------------------+----------+
|1  |2019-06-24 12:01:19.000|2019-06-24 12:01:19|2019-06-24|
+---+-----------------------+-------------------+----------+



In [67]:
# Using Cast to convert Timestamp String to DateType
df.withColumn('date_type', col('input_timestamp').cast('date')) \
       .show(truncate=False)

# Using Cast to convert TimestampType to DateType
df.withColumn('date_type', to_timestamp('input_timestamp').cast('date')) \
  .show(truncate=False)

+---+-----------------------+----------+
|id |input_timestamp        |date_type |
+---+-----------------------+----------+
|1  |2019-06-24 12:01:19.000|2019-06-24|
+---+-----------------------+----------+

+---+-----------------------+----------+
|id |input_timestamp        |date_type |
+---+-----------------------+----------+
|1  |2019-06-24 12:01:19.000|2019-06-24|
+---+-----------------------+----------+



##### Broadcast join 

In [15]:
from pyspark.sql.functions import broadcast

# Chhota dataframe
cities = spark.createDataFrame([
    (1, "New York"),
    (2, "Los Angeles"),
    (3, "Chicago")
], ["city_id", "city_name"])

# Bada dataframe
people = spark.createDataFrame([
    (1, "John Doe", 1),
    (2, "Jane Doe", 2),
    (3, "Sam Smith", 3),
    (4, "Will Johnson", 1)
], ["id", "name", "city_id"])

# Broadcast Join
joined_df = people.join(broadcast(cities), people.city_id == cities.city_id)


In [16]:
joined_df.show()

+---+------------+-------+-------+-----------+
| id|        name|city_id|city_id|  city_name|
+---+------------+-------+-------+-----------+
|  1|    John Doe|      1|      1|   New York|
|  2|    Jane Doe|      2|      2|Los Angeles|
|  3|   Sam Smith|      3|      3|    Chicago|
|  4|Will Johnson|      1|      1|   New York|
+---+------------+-------+-------+-----------+

