In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id

# Initialize Spark Session
spark = SparkSession.builder.appName("IndexColumn").getOrCreate()

# Sample Data
data = [("Alice", 30), ("Bob", 25), ("Charlie", 35)]
columns = ["Name", "Age"]

# Create DataFrame
df = spark.createDataFrame(data, columns)

# Add an Index Column
df_with_index = df.withColumn("Index", monotonically_increasing_id())

# Show Output
df_with_index.show()

+-------+---+-----------+
|   Name|Age|      Index|
+-------+---+-----------+
|  Alice| 30|17179869184|
|    Bob| 25|42949672960|
|Charlie| 35|60129542144|
+-------+---+-----------+



In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Define Window Specification
windowSpec = Window.orderBy("Name")

# Add Index Column
df_with_seq_index = df.withColumn("Index", row_number().over(windowSpec))

# Show Output
df_with_seq_index.show()


+-------+---+-----+
|   Name|Age|Index|
+-------+---+-----+
|  Alice| 30|    1|
|    Bob| 25|    2|
|Charlie| 35|    3|
+-------+---+-----+



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when

# Initialize Spark Session
spark = SparkSession.builder.appName("Top2FrequentValues").getOrCreate()

# Sample Data
data = [
    (1, "Apple"), (2, "Banana"), (3, "Apple"), (4, "Orange"),
    (5, "Apple"), (6, "Banana"), (7, "Orange"), (8, "Grape"),
    (9, "Grape"), (10, "Mango")
]

# Create DataFrame
df = spark.createDataFrame(data, ["ID", "Category"])

# Count frequency of each category
category_counts = df.groupBy("Category").count()

# Get the top 2 most frequent categories
top_categories = category_counts.orderBy(col("count").desc()).limit(2)

# Extract top categories as a list
top_categories_list = [row["Category"] for row in top_categories.collect()]

# Replace all non-top categories with "Other"
df_modified = df.withColumn(
    "Category",
    when(col("Category").isin(top_categories_list), col("Category")).otherwise("Other")
)

# Show the result
df_modified.show()


+---+--------+
| ID|Category|
+---+--------+
|  1|   Apple|
|  2|  Banana|
|  3|   Apple|
|  4|   Other|
|  5|   Apple|
|  6|  Banana|
|  7|   Other|
|  8|   Other|
|  9|   Other|
| 10|   Other|
+---+--------+



In [0]:
from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder.appName("RenameColumns").getOrCreate()

# Sample Data
data = [("John",30,  "NY"), ("Anna",40,  "LA"), ("Mike",50,  "SF")]

# Create DataFrame with old column names
old_columns = ["col1", "col2", "col3"]
df = spark.createDataFrame(data, old_columns)

# New column names
new_columns = ["name", "age", "address"]

# Rename columns using old and new column names
for old_col, new_col in zip(old_columns, new_columns):
    df = df.withColumnRenamed(old_col, new_col)

# Show the result
df.show()


+----+---+-------+
|name|age|address|
+----+---+-------+
|John| 30|     NY|
|Anna| 40|     LA|
|Mike| 50|     SF|
+----+---+-------+



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark Session
spark = SparkSession.builder.appName("ExtractItemsAtPositions").getOrCreate()

# Sample Data: A column containing lists
data = [(1, [10, 20, 30, 40]), (2, [50, 60, 70, 80]), (3, [90, 100, 110, 120])]

# Create DataFrame
df = spark.createDataFrame(data, ["ID", "Values"])

# Extract the first and second items using getItem()
df_extracted = df.withColumn("First_Item", col("Values").getItem(0)) \
                 .withColumn("Second_Item", col("Values").getItem(1)).withColumn("Third_Item", col("Values").getItem(2))

# Show the result
df_extracted.show()


+---+-------------------+----------+-----------+----------+
| ID|             Values|First_Item|Second_Item|Third_Item|
+---+-------------------+----------+-----------+----------+
|  1|   [10, 20, 30, 40]|        10|         20|        30|
|  2|   [50, 60, 70, 80]|        50|         60|        70|
|  3|[90, 100, 110, 120]|        90|        100|       110|
+---+-------------------+----------+-----------+----------+



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, upper, substring
from pyspark.sql.functions import initcap
# Initialize Spark Session
spark = SparkSession.builder.appName("CapitalizeFirstCharacter").getOrCreate()

# Sample Data
data = [(1, "john"), (2, "alice"), (3, "bob"), (4, "carol")]

# Create DataFrame
df = spark.createDataFrame(data, ["ID", "Name"])

# Capitalize the first character of each element in the 'Name' column
# df_modified = df.withColumn("Name", 
#                             upper(substring(col("Name"), 1, 1)) + substring(col("Name"), 2, 100))
df_modified = df.withColumn("name", initcap(df["name"]))
# Show the result
df_modified.show()

+---+-----+
| ID| name|
+---+-----+
|  1| John|
|  2|Alice|
|  3|  Bob|
|  4|Carol|
+---+-----+



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, length, split

# Initialize Spark Session
spark = SparkSession.builder.appName("WordLength").getOrCreate()

# Sample Data
data = [(1, "PySpark is great"), (2, "Big Data is amazing"), (3, "Data Engineering Rocks")]

# Create DataFrame
df = spark.createDataFrame(data, ["ID", "Sentence"])

# Split the Sentence into Words
df_split = df.withColumn("Words", split(col("Sentence"), " "))

# Calculate the length of each word in the 'Words' column
df_lengths = df_split.withColumn("Word_1_Length", length(col("Words").getItem(0))) \
                     .withColumn("Word_2_Length", length(col("Words").getItem(1))) \
                     .withColumn("Word_3_Length", length(col("Words").getItem(2)))

# Show the result
df_lengths.show()

+---+--------------------+--------------------+-------------+-------------+-------------+
| ID|            Sentence|               Words|Word_1_Length|Word_2_Length|Word_3_Length|
+---+--------------------+--------------------+-------------+-------------+-------------+
|  1|    PySpark is great|[PySpark, is, great]|            7|            2|            5|
|  2| Big Data is amazing|[Big, Data, is, a...|            3|            4|            2|
|  3|Data Engineering ...|[Data, Engineerin...|            4|           11|            5|
+---+--------------------+--------------------+-------------+-------------+-------------+



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Initialize Spark Session
spark = SparkSession.builder.appName("DifferenceOfDifferences").getOrCreate()

# Sample Data
data = [(1, 5), (2, 8), (3, 12), (4, 15), (5, 20)]

# Create DataFrame
df = spark.createDataFrame(data, ["ID", "Value"])

# Define a window specification
windowSpec = Window.orderBy("ID")

# Compute the difference between consecutive values
df_diff = df.withColumn("Diff", F.col("Value") - F.lag("Value", 1).over(windowSpec))

# Compute the difference of differences
df_diff_of_diff = df_diff.withColumn("Diff_of_Diff", F.col("Diff") - F.lag("Diff", 1).over(windowSpec))

# Show the result
df_diff_of_diff.show()


+---+-----+----+------------+
| ID|Value|Diff|Diff_of_Diff|
+---+-----+----+------------+
|  1|    5|NULL|        NULL|
|  2|    8|   3|        NULL|
|  3|   12|   4|           1|
|  4|   15|   3|          -1|
|  5|   20|   5|           2|
+---+-----+----+------------+



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType

# Initialize Spark Session
spark = SparkSession.builder.appName("FilterWordsWithVowels").getOrCreate()

# Sample Data
data = [(1, "Apple"), (2, "Banana"), (3, "Sky"), (4, "Grapes"), (5, "Cat")]

# Create DataFrame
df = spark.createDataFrame(data, ["ID", "Word"])

# Define UDF to check if a word contains at least 2 vowels
def contains_two_vowels(word):
    vowels = "aeiou"
    word = word.lower()  # Convert to lowercase
    vowel_count = sum(1 for char in word if char in vowels)
    return vowel_count >= 2

# Register the UDF
contains_two_vowels_udf = udf(contains_two_vowels, BooleanType())

# Filter words that contain at least 2 vowels
df_filtered = df.filter(contains_two_vowels_udf(df["Word"]))

# Show the result
df_filtered.show()

+---+------+
| ID|  Word|
+---+------+
|  1| Apple|
|  2|Banana|
|  4|Grapes|
+---+------+



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum

# Initialize Spark session
spark = SparkSession.builder.appName("PivotDataFrame").getOrCreate()

# Sample Data
data = [(1, "A", 10), (1, "B", 20), (2, "A", 30), (2, "B", 40), (3, "A", 50), (3, "B", 60)]

# Create DataFrame
df = spark.createDataFrame(data, ["ID", "Category", "Value"])

# Pivot the DataFrame
pivot_df = df.groupBy("ID").pivot("Category").agg(sum("Value"))

# Show the pivoted DataFrame
pivot_df.show()

+---+---+---+
| ID|  A|  B|
+---+---+---+
|  1| 10| 20|
|  3| 50| 60|
|  2| 30| 40|
+---+---+---+

