<a href="https://colab.research.google.com/github/mujahed85/PySpark/blob/main/PySpark_Basic2Adv.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Linux Basic

In [None]:
!cat /etc/os-release

PRETTY_NAME="Ubuntu 22.04.4 LTS"
NAME="Ubuntu"
VERSION_ID="22.04"
VERSION="22.04.4 LTS (Jammy Jellyfish)"
VERSION_CODENAME=jammy
ID=ubuntu
ID_LIKE=debian
HOME_URL="https://www.ubuntu.com/"
SUPPORT_URL="https://help.ubuntu.com/"
BUG_REPORT_URL="https://bugs.launchpad.net/ubuntu/"
PRIVACY_POLICY_URL="https://www.ubuntu.com/legal/terms-and-policies/privacy-policy"
UBUNTU_CODENAME=jammy


In [None]:
!uname -a

Linux 0f971c837430 6.1.123+ #1 SMP PREEMPT_DYNAMIC Sun Mar 30 16:01:29 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux


In [None]:
!whoami

root


In [None]:
!pwd

/content


In [None]:
from pyspark.sql import SparkSession

In [None]:
spark =SparkSession.builder.appName("Basics").getOrCreate()

In [None]:
# Create DataFrame
data = [("Hello", "World")]
columns = ["Word1", "Word2"]

df = spark.createDataFrame(data, columns)

## PySpark Basics of DataFrame

In [None]:
!pip install pyspark



In [None]:
!pip show pyspark

Name: pyspark
Version: 3.5.1
Summary: Apache Spark Python API
Home-page: https://github.com/apache/spark/tree/master/python
Author: Spark Developers
Author-email: dev@spark.apache.org
License: http://www.apache.org/licenses/LICENSE-2.0
Location: /usr/local/lib/python3.11/dist-packages
Requires: py4j
Required-by: dataproc-spark-connect


In [None]:
from pyspark.sql import SparkSession

In [None]:
spark =SparkSession.builder.appName("Basics").getOrCreate()

In [None]:
# Create DataFrame
data = [("Hello", "World")]
columns = ["Word1", "Word2"]

df = spark.createDataFrame(data, columns)

In [None]:
df.show()

+-----+-----+
|Word1|Word2|
+-----+-----+
|Hello|World|
+-----+-----+



## Basic Transformation and Actions

In [None]:
columns = ["Name","Department","Salary"]
data = [
    ("John", "Sales", 3000),
    ("Jane", "Finance", 4000),
    ("Mike", "Sales", 3500),
    ("Alice", "Finance", 3800),
    ("Bob", "IT", 4500)
]

df = spark.createDataFrame(data, columns)

df.show()

+-----+----------+------+
| Name|Department|Salary|
+-----+----------+------+
| John|     Sales|  3000|
| Jane|   Finance|  4000|
| Mike|     Sales|  3500|
|Alice|   Finance|  3800|
|  Bob|        IT|  4500|
+-----+----------+------+



In [None]:
# Filter: Employees with salary > 3500
df_filtered = df.filter(df.Salary > 3500)
df_filtered.show()

+-----+----------+------+
| Name|Department|Salary|
+-----+----------+------+
| Jane|   Finance|  4000|
|Alice|   Finance|  3800|
|  Bob|        IT|  4500|
+-----+----------+------+



In [None]:
# GroupBy and Aggregate: Average salary by department
df_grouped = df.groupBy("Department").avg("Salary")
df_grouped.show()

+----------+-----------+
|Department|avg(Salary)|
+----------+-----------+
|     Sales|     3250.0|
|   Finance|     3900.0|
|        IT|     4500.0|
+----------+-----------+



In [None]:
# Add a new column: Salary with bonus (10%)
from pyspark.sql.functions import col
exp=col("Salary") * 1.1
df_with_bonus = df.withColumn("Salary_10%_Bonus", exp)
df_with_bonus.show()

+-----+----------+------+------------------+
| Name|Department|Salary|  Salary_10%_Bonus|
+-----+----------+------+------------------+
| John|     Sales|  3000|3300.0000000000005|
| Jane|   Finance|  4000|            4400.0|
| Mike|     Sales|  3500|3850.0000000000005|
|Alice|   Finance|  3800|            4180.0|
|  Bob|        IT|  4500|            4950.0|
+-----+----------+------+------------------+



In [None]:
from pyspark.sql.functions import col,upper, lower, concat_ws,length,when

df.show()

+-----+----------+------+
| Name|Department|Salary|
+-----+----------+------+
| John|     Sales|  3000|
| Jane|   Finance|  4000|
| Mike|     Sales|  3500|
|Alice|   Finance|  3800|
|  Bob|        IT|  4500|
+-----+----------+------+



In [None]:
# Change Case Transformation
df_upper = df.withColumn("Name_Upper", upper(col("Name")))
df_lower = df.withColumn("Name_Lower", lower(col("Name")))
df_upper.show()
df_lower.show()

+-----+----------+------+----------+
| Name|Department|Salary|Name_Upper|
+-----+----------+------+----------+
| John|     Sales|  3000|      JOHN|
| Jane|   Finance|  4000|      JANE|
| Mike|     Sales|  3500|      MIKE|
|Alice|   Finance|  3800|     ALICE|
|  Bob|        IT|  4500|       BOB|
+-----+----------+------+----------+

+-----+----------+------+----------+
| Name|Department|Salary|Name_Lower|
+-----+----------+------+----------+
| John|     Sales|  3000|      john|
| Jane|   Finance|  4000|      jane|
| Mike|     Sales|  3500|      mike|
|Alice|   Finance|  3800|     alice|
|  Bob|        IT|  4500|       bob|
+-----+----------+------+----------+



In [None]:
# Concatenate Columns
df_concat = df.withColumn("Full_Info", concat_ws(" - ", col("Name"), col("Department")))
df_concat.show()

+-----+----------+------+---------------+
| Name|Department|Salary|      Full_Info|
+-----+----------+------+---------------+
| John|     Sales|  3000|   John - Sales|
| Jane|   Finance|  4000| Jane - Finance|
| Mike|     Sales|  3500|   Mike - Sales|
|Alice|   Finance|  3800|Alice - Finance|
|  Bob|        IT|  4500|       Bob - IT|
+-----+----------+------+---------------+



In [None]:
# String length of Names in New DF
df_length = df.withColumn("Name_Length", length(col("Name")))
df_length.show()

+-----+----------+------+-----------+
| Name|Department|Salary|Name_Length|
+-----+----------+------+-----------+
| John|     Sales|  3000|          4|
| Jane|   Finance|  4000|          4|
| Mike|     Sales|  3500|          4|
|Alice|   Finance|  3800|          5|
|  Bob|        IT|  4500|          3|
+-----+----------+------+-----------+



In [None]:
# Conditional Column (Salary Category)
df_conditional = df.withColumn(
    "Salary_Category",
    when(col("Salary") >= 4000, "High")
    .when(col("Salary") >= 3500, "Medium")
    .otherwise("Low")
)
df_conditional.show()

+-----+----------+------+---------------+
| Name|Department|Salary|Salary_Category|
+-----+----------+------+---------------+
| John|     Sales|  3000|            Low|
| Jane|   Finance|  4000|           High|
| Mike|     Sales|  3500|         Medium|
|Alice|   Finance|  3800|         Medium|
|  Bob|        IT|  4500|           High|
+-----+----------+------+---------------+



In [None]:
# Rename Column (Salary --> Base_Salary)
df_renamed = df.withColumnRenamed("Salary", "Base_Salary")
df_renamed.show()

## Advance Transformation and Action in PySpark

In [None]:
from pyspark.sql import SparkSession
spark =SparkSession.builder.appName("Basics").getOrCreate()
columns = ["Name","Department","Salary"]
data = [
    ("John", "Sales", 3000),
    ("Jane", "Finance", 4000),
    ("Mike", "Sales", 3500),
    ("Alice", "Finance", 3800),
    ("Bob", "IT", 4500)
]

df = spark.createDataFrame(data, columns)

df.show()

+-----+----------+------+
| Name|Department|Salary|
+-----+----------+------+
| John|     Sales|  3000|
| Jane|   Finance|  4000|
| Mike|     Sales|  3500|
|Alice|   Finance|  3800|
|  Bob|        IT|  4500|
+-----+----------+------+



In [None]:
# Group by Department and count employees
df.groupBy("Department").count().show()

+----------+-----+
|Department|count|
+----------+-----+
|     Sales|    2|
|   Finance|    2|
|        IT|    1|
+----------+-----+



In [None]:
# Group by Department and calculate average salary
df.groupBy("Department").avg("Salary").show()

+----------+-----------+
|Department|avg(Salary)|
+----------+-----------+
|     Sales|     3250.0|
|   Finance|     3900.0|
|        IT|     4500.0|
+----------+-----------+



In [None]:
# Group by Department and Caculate multiple Aggregations
from pyspark.sql import functions as F
df.groupBy("Department") \
  .agg(
      F.avg("Salary").alias("Average Salary"),
      F.max("Salary").alias("Max Salary"),
      F.min("Salary").alias("Min Salary")
  ).show()

+----------+--------------+----------+----------+
|Department|Average Salary|Max Salary|Min Salary|
+----------+--------------+----------+----------+
|     Sales|        3250.0|      3500|      3000|
|   Finance|        3900.0|      4000|      3800|
|        IT|        4500.0|      4500|      4500|
+----------+--------------+----------+----------+



In [None]:
# Create another DataFrame for department info
dept_data = [
    ("Sales", "Building A"),
    ("Finance", "Building B"),
    ("IT", "Building C")
]
dept_columns = ["Department", "Location"]

In [None]:
df.show()

+-----+----------+------+
| Name|Department|Salary|
+-----+----------+------+
| John|     Sales|  3000|
| Jane|   Finance|  4000|
| Mike|     Sales|  3500|
|Alice|   Finance|  3800|
|  Bob|        IT|  4500|
+-----+----------+------+



In [None]:
dept_df = spark.createDataFrame(dept_data, dept_columns)

# Join employee data with department info
joined_df = df.join(dept_df, on="Department", how="inner")
joined_df.show()

+----------+-----+------+----------+
|Department| Name|Salary|  Location|
+----------+-----+------+----------+
|   Finance| Jane|  4000|Building B|
|   Finance|Alice|  3800|Building B|
|        IT|  Bob|  4500|Building C|
|     Sales| John|  3000|Building A|
|     Sales| Mike|  3500|Building A|
+----------+-----+------+----------+



In [None]:
dept_df.show()

+----------+----------+
|Department|  Location|
+----------+----------+
|     Sales|Building A|
|   Finance|Building B|
|        IT|Building C|
+----------+----------+



In [None]:
# Employee DataFrame
emp_data = [
    (1, "John", "Sales", 3000),
    (2, "Jane", "Finance", 4000),
    (3, "Mike", "Sales", 3500),
    (4, "Alice", "HR", 3800),
    (5, "Bob", "IT", 4500),
    (6, "Sam", "Support", 3200)
]
emp_cols = ["EmpID", "Name", "Department", "Salary"]
emp_df = spark.createDataFrame(emp_data, emp_cols)

# Department DataFrame
dept_data = [
    ("Sales", "Building A"),
    ("Finance", "Building B"),
    ("IT", "Building C"),
    ("Admin", "Building D")
]
dept_cols = ["Department", "Location"]
dept_df = spark.createDataFrame(dept_data, dept_cols)

# Display both
emp_df.show()
dept_df.show()

+-----+-----+----------+------+
|EmpID| Name|Department|Salary|
+-----+-----+----------+------+
|    1| John|     Sales|  3000|
|    2| Jane|   Finance|  4000|
|    3| Mike|     Sales|  3500|
|    4|Alice|        HR|  3800|
|    5|  Bob|        IT|  4500|
|    6|  Sam|   Support|  3200|
+-----+-----+----------+------+

+----------+----------+
|Department|  Location|
+----------+----------+
|     Sales|Building A|
|   Finance|Building B|
|        IT|Building C|
|     Admin|Building D|
+----------+----------+



In [None]:
# Perform Inner Join
emp_df.join(dept_df, on="Department", how="inner").show()

+----------+-----+----+------+----------+
|Department|EmpID|Name|Salary|  Location|
+----------+-----+----+------+----------+
|   Finance|    2|Jane|  4000|Building B|
|        IT|    5| Bob|  4500|Building C|
|     Sales|    1|John|  3000|Building A|
|     Sales|    3|Mike|  3500|Building A|
+----------+-----+----+------+----------+



In [None]:
# Left Join (All Employees, Dept Info if Exists)
emp_df.join(dept_df, on="Department", how="left").show()

+----------+-----+-----+------+----------+
|Department|EmpID| Name|Salary|  Location|
+----------+-----+-----+------+----------+
|     Sales|    1| John|  3000|Building A|
|     Sales|    3| Mike|  3500|Building A|
|   Finance|    2| Jane|  4000|Building B|
|        HR|    4|Alice|  3800|      NULL|
|        IT|    5|  Bob|  4500|Building C|
|   Support|    6|  Sam|  3200|      NULL|
+----------+-----+-----+------+----------+



In [None]:
# Right Outer Join (All departments, emp info if exists)
emp_df.join(dept_df, on="Department", how="right").show()

+----------+-----+----+------+----------+
|Department|EmpID|Name|Salary|  Location|
+----------+-----+----+------+----------+
|     Sales|    3|Mike|  3500|Building A|
|     Sales|    1|John|  3000|Building A|
|   Finance|    2|Jane|  4000|Building B|
|     Admin| NULL|NULL|  NULL|Building D|
|        IT|    5| Bob|  4500|Building C|
+----------+-----+----+------+----------+



In [None]:
# Full Outer Join (All records from both)
emp_df.join(dept_df, on="Department", how="full").show()

## Adv. PySpark(Window Functions)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("AdvancedOps").getOrCreate()

data = [
    (1, "Alice", 2000, ["math", "science"], {"city": "NYC", "zip": "10001"}),
    (2, "Bob", 1500, ["english"], {"city": "SF", "zip": "94105"}),
    (3, "Charlie", 2200, ["math", "history", "science"], {"city": "NYC", "zip": "10001"}),
    (4, "David", 1200, ["art"], {"city": "LA", "zip": "90001"}),
]

df = spark.createDataFrame(data, schema=["id", "name", "salary", "subjects", "address"])
df.show(truncate=False)

+---+-------+------+------------------------+---------------------------+
|id |name   |salary|subjects                |address                    |
+---+-------+------+------------------------+---------------------------+
|1  |Alice  |2000  |[math, science]         |{zip -> 10001, city -> NYC}|
|2  |Bob    |1500  |[english]               |{zip -> 94105, city -> SF} |
|3  |Charlie|2200  |[math, history, science]|{zip -> 10001, city -> NYC}|
|4  |David  |1200  |[art]                   |{zip -> 90001, city -> LA} |
+---+-------+------+------------------------+---------------------------+



In [None]:
from pyspark.sql.window import Window
window_spec = Window.partitionBy("address.city").orderBy("salary")
df.withColumn("rank", rank().over(window_spec)).show()

+---+-------+------+--------------------+--------------------+----+
| id|   name|salary|            subjects|             address|rank|
+---+-------+------+--------------------+--------------------+----+
|  4|  David|  1200|               [art]|{zip -> 90001, ci...|   1|
|  1|  Alice|  2000|     [math, science]|{zip -> 10001, ci...|   1|
|  3|Charlie|  2200|[math, history, s...|{zip -> 10001, ci...|   2|
|  2|    Bob|  1500|           [english]|{zip -> 94105, ci...|   1|
+---+-------+------+--------------------+--------------------+----+



In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number, rank, dense_rank, max, sum, avg

# Employee Data
data = [
    (1, "John", "Sales", 3000),
    (2, "Jane", "Finance", 4000),
    (3, "Mike", "Sales", 3500),
    (4, "Alice", "Finance", 3800),
    (5, "Bob", "IT", 4500),
    (6, "Tom", "Sales", 3700),
    (7, "Jerry", "Finance", 4200),
    (8, "Sam", "IT", 4700),
    (9, "Steve", "Sales", 3100),
    (10, "Rachel", "IT", 4600)
]
columns = ["EmpID", "Name", "Department", "Salary"]
df = spark.createDataFrame(data, columns)

df.show()

+-----+------+----------+------+
|EmpID|  Name|Department|Salary|
+-----+------+----------+------+
|    1|  John|     Sales|  3000|
|    2|  Jane|   Finance|  4000|
|    3|  Mike|     Sales|  3500|
|    4| Alice|   Finance|  3800|
|    5|   Bob|        IT|  4500|
|    6|   Tom|     Sales|  3700|
|    7| Jerry|   Finance|  4200|
|    8|   Sam|        IT|  4700|
|    9| Steve|     Sales|  3100|
|   10|Rachel|        IT|  4600|
+-----+------+----------+------+



In [None]:
windowSpec = Window.partitionBy("Department").orderBy(col("Salary").desc())
df.withColumn("Rank", rank().over(windowSpec)).show()

+-----+------+----------+------+----+
|EmpID|  Name|Department|Salary|Rank|
+-----+------+----------+------+----+
|    7| Jerry|   Finance|  4200|   1|
|    2|  Jane|   Finance|  4000|   2|
|    4| Alice|   Finance|  3800|   3|
|    8|   Sam|        IT|  4700|   1|
|   10|Rachel|        IT|  4600|   2|
|    5|   Bob|        IT|  4500|   3|
|    6|   Tom|     Sales|  3700|   1|
|    3|  Mike|     Sales|  3500|   2|
|    9| Steve|     Sales|  3100|   3|
|    1|  John|     Sales|  3000|   4|
+-----+------+----------+------+----+



In [None]:
windowSpec = Window.partitionBy("Department")
df.withColumn("MaxSalaryDept", max("Salary").over(windowSpec)).show()

+-----+------+----------+------+-------------+
|EmpID|  Name|Department|Salary|MaxSalaryDept|
+-----+------+----------+------+-------------+
|    2|  Jane|   Finance|  4000|         4200|
|    4| Alice|   Finance|  3800|         4200|
|    7| Jerry|   Finance|  4200|         4200|
|    5|   Bob|        IT|  4500|         4700|
|    8|   Sam|        IT|  4700|         4700|
|   10|Rachel|        IT|  4600|         4700|
|    1|  John|     Sales|  3000|         3700|
|    3|  Mike|     Sales|  3500|         3700|
|    6|   Tom|     Sales|  3700|         3700|
|    9| Steve|     Sales|  3100|         3700|
+-----+------+----------+------+-------------+



In [None]:
windowSpec = Window.partitionBy("Department").orderBy("Salary").rowsBetween(Window.unboundedPreceding, 0)
df.withColumn("CumulativeSalary", sum("Salary").over(windowSpec)).show()

+-----+------+----------+------+----------------+
|EmpID|  Name|Department|Salary|CumulativeSalary|
+-----+------+----------+------+----------------+
|    4| Alice|   Finance|  3800|            3800|
|    2|  Jane|   Finance|  4000|            7800|
|    7| Jerry|   Finance|  4200|           12000|
|    5|   Bob|        IT|  4500|            4500|
|   10|Rachel|        IT|  4600|            9100|
|    8|   Sam|        IT|  4700|           13800|
|    1|  John|     Sales|  3000|            3000|
|    9| Steve|     Sales|  3100|            6100|
|    3|  Mike|     Sales|  3500|            9600|
|    6|   Tom|     Sales|  3700|           13300|
+-----+------+----------+------+----------------+



In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, sum

window_base = Window.partitionBy("Department").orderBy("Salary")

df.withColumn("CumulativeSalary_Default", sum("Salary").over(window_base)).show()


+-----+------+----------+------+------------------------+
|EmpID|  Name|Department|Salary|CumulativeSalary_Default|
+-----+------+----------+------+------------------------+
|    4| Alice|   Finance|  3800|                    3800|
|    2|  Jane|   Finance|  4000|                    7800|
|    7| Jerry|   Finance|  4200|                   12000|
|    5|   Bob|        IT|  4500|                    4500|
|   10|Rachel|        IT|  4600|                    9100|
|    8|   Sam|        IT|  4700|                   13800|
|    1|  John|     Sales|  3000|                    3000|
|    9| Steve|     Sales|  3100|                    6100|
|    3|  Mike|     Sales|  3500|                    9600|
|    6|   Tom|     Sales|  3700|                   13300|
+-----+------+----------+------+------------------------+



In [None]:
window_full_partition = Window.partitionBy("Department")
df.withColumn("TotalSalaryByDept", sum("Salary").over(window_full_partition)).show()

+-----+------+----------+------+-----------------+
|EmpID|  Name|Department|Salary|TotalSalaryByDept|
+-----+------+----------+------+-----------------+
|    2|  Jane|   Finance|  4000|            12000|
|    4| Alice|   Finance|  3800|            12000|
|    7| Jerry|   Finance|  4200|            12000|
|    5|   Bob|        IT|  4500|            13800|
|    8|   Sam|        IT|  4700|            13800|
|   10|Rachel|        IT|  4600|            13800|
|    1|  John|     Sales|  3000|            13300|
|    3|  Mike|     Sales|  3500|            13300|
|    6|   Tom|     Sales|  3700|            13300|
|    9| Steve|     Sales|  3100|            13300|
+-----+------+----------+------+-----------------+



In [None]:
window_base = Window.partitionBy("Department").orderBy("Salary")
window_current = window_base.rowsBetween(0, 0)
df.withColumn("Salary_CurrentOnly", sum("Salary").over(window_current)).show()

+-----+------+----------+------+------------------+
|EmpID|  Name|Department|Salary|Salary_CurrentOnly|
+-----+------+----------+------+------------------+
|    4| Alice|   Finance|  3800|              3800|
|    2|  Jane|   Finance|  4000|              4000|
|    7| Jerry|   Finance|  4200|              4200|
|    5|   Bob|        IT|  4500|              4500|
|   10|Rachel|        IT|  4600|              4600|
|    8|   Sam|        IT|  4700|              4700|
|    1|  John|     Sales|  3000|              3000|
|    9| Steve|     Sales|  3100|              3100|
|    3|  Mike|     Sales|  3500|              3500|
|    6|   Tom|     Sales|  3700|              3700|
+-----+------+----------+------+------------------+



In [None]:
window_1prev = window_base.rowsBetween(-1, 0)
df.withColumn("Rolling2Salary", sum("Salary").over(window_1prev)).show()

+-----+------+----------+------+--------------+
|EmpID|  Name|Department|Salary|Rolling2Salary|
+-----+------+----------+------+--------------+
|    4| Alice|   Finance|  3800|          3800|
|    2|  Jane|   Finance|  4000|          7800|
|    7| Jerry|   Finance|  4200|          8200|
|    5|   Bob|        IT|  4500|          4500|
|   10|Rachel|        IT|  4600|          9100|
|    8|   Sam|        IT|  4700|          9300|
|    1|  John|     Sales|  3000|          3000|
|    9| Steve|     Sales|  3100|          6100|
|    3|  Mike|     Sales|  3500|          6600|
|    6|   Tom|     Sales|  3700|          7200|
+-----+------+----------+------+--------------+



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("AdvancedOps").getOrCreate()

data = [
    (1, "Alice", 2000, ["math", "science"], {"city": "NYC", "zip": "10001"}),
    (2, "Bob", 1500, ["english"], {"city": "SF", "zip": "94105"}),
    (3, "Charlie", 2200, ["math", "history", "science"], {"city": "NYC", "zip": "10001"}),
    (4, "David", 1200, ["art"], {"city": "LA", "zip": "90001"}),
]

df = spark.createDataFrame(data, schema=["id", "name", "salary", "subjects", "address"])
df.show(truncate=False)

+---+-------+------+------------------------+---------------------------+
|id |name   |salary|subjects                |address                    |
+---+-------+------+------------------------+---------------------------+
|1  |Alice  |2000  |[math, science]         |{zip -> 10001, city -> NYC}|
|2  |Bob    |1500  |[english]               |{zip -> 94105, city -> SF} |
|3  |Charlie|2200  |[math, history, science]|{zip -> 10001, city -> NYC}|
|4  |David  |1200  |[art]                   |{zip -> 90001, city -> LA} |
+---+-------+------+------------------------+---------------------------+



In [None]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

@udf(IntegerType())
def subject_count(subjects):
    return len(subjects)

df.withColumn("subject_count", subject_count(df.subjects)).show()

+---+-------+------+--------------------+--------------------+-------------+
| id|   name|salary|            subjects|             address|subject_count|
+---+-------+------+--------------------+--------------------+-------------+
|  1|  Alice|  2000|     [math, science]|{zip -> 10001, ci...|            2|
|  2|    Bob|  1500|           [english]|{zip -> 94105, ci...|            1|
|  3|Charlie|  2200|[math, history, s...|{zip -> 10001, ci...|            3|
|  4|  David|  1200|               [art]|{zip -> 90001, ci...|            1|
+---+-------+------+--------------------+--------------------+-------------+



In [None]:
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType

@pandas_udf(DoubleType())
def multiply_by_two(s: pd.Series) -> pd.Series:
    return s * 2

df.withColumn("double_salary", multiply_by_two(df.salary)).show()

+---+-------+------+--------------------+--------------------+-------------+
| id|   name|salary|            subjects|             address|double_salary|
+---+-------+------+--------------------+--------------------+-------------+
|  1|  Alice|  2000|     [math, science]|{zip -> 10001, ci...|       4000.0|
|  2|    Bob|  1500|           [english]|{zip -> 94105, ci...|       3000.0|
|  3|Charlie|  2200|[math, history, s...|{zip -> 10001, ci...|       4400.0|
|  4|  David|  1200|               [art]|{zip -> 90001, ci...|       2400.0|
+---+-------+------+--------------------+--------------------+-------------+



## Complex Nested Schemas Handling

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode

spark = SparkSession.builder.appName("AdvancedOps").getOrCreate()

data = [
    ("John", ["Python", "Java"]),
    ("Jane", ["SQL", "R", "Scala"]),
    ("Mike", [])
]
columns = ["Name", "Skills"]

df = spark.createDataFrame(data, columns)
df.show(truncate=False)

+----+---------------+
|Name|Skills         |
+----+---------------+
|John|[Python, Java] |
|Jane|[SQL, R, Scala]|
|Mike|[]             |
+----+---------------+



In [None]:
df_exploded = df.withColumn("Skill", explode(df.Skills))
df_exploded.show()

+----+---------------+------+
|Name|         Skills| Skill|
+----+---------------+------+
|John| [Python, Java]|Python|
|John| [Python, Java]|  Java|
|Jane|[SQL, R, Scala]|   SQL|
|Jane|[SQL, R, Scala]|     R|
|Jane|[SQL, R, Scala]| Scala|
+----+---------------+------+



In [None]:
df.createOrReplaceTempView("people")

In [None]:
df_lateral = spark.sql("""
    SELECT Name, Skill
    FROM people
    LATERAL VIEW explode(Skills) AS Skill
""")
df_lateral.show()

+----+------+
|Name| Skill|
+----+------+
|John|Python|
|John|  Java|
|Jane|   SQL|
|Jane|     R|
|Jane| Scala|
+----+------+



### Pivot and Unpivot Data

In [None]:
data = [
    ("ProductA", "Jan", 100),
    ("ProductA", "Feb", 150),
    ("ProductA", "Mar", 120),
    ("ProductB", "Jan", 200),
    ("ProductB", "Feb", 230),
    ("ProductB", "Mar", 210),
]
columns = ["Product", "Month", "Sales"]

df = spark.createDataFrame(data, columns)
df.show()

+--------+-----+-----+
| Product|Month|Sales|
+--------+-----+-----+
|ProductA|  Jan|  100|
|ProductA|  Feb|  150|
|ProductA|  Mar|  120|
|ProductB|  Jan|  200|
|ProductB|  Feb|  230|
|ProductB|  Mar|  210|
+--------+-----+-----+



In [None]:
pivot_df = df.groupBy("Product").pivot("Month").sum("Sales")
pivot_df.show()

+--------+---+---+---+
| Product|Feb|Jan|Mar|
+--------+---+---+---+
|ProductB|230|200|210|
|ProductA|150|100|120|
+--------+---+---+---+



In [None]:
# First use pivot to create wide format
wide_df = df.groupBy("Product").pivot("Month").sum("Sales")

# Then unpivot back to long format
unpivot_df = wide_df.selectExpr(
    "Product",
    "stack(3, 'Jan', Jan, 'Feb', Feb, 'Mar', Mar) as (Month, Sales)"
)
unpivot_df.show()

+--------+-----+-----+
| Product|Month|Sales|
+--------+-----+-----+
|ProductB|  Jan|  200|
|ProductB|  Feb|  230|
|ProductB|  Mar|  210|
|ProductA|  Jan|  100|
|ProductA|  Feb|  150|
|ProductA|  Mar|  120|
+--------+-----+-----+

