In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("app1").getOrCreate()

# **1. Create an empty DataFrame**
#### *You can create an empty DataFrame using spark.createDataFrame with no data.*

In [7]:
#Empty DataFrame with no data
empty_df = spark.createDataFrame([],"id Int, name String")
empty_df.show()

+---+----+
| id|name|
+---+----+
+---+----+



# 2. **Convert RDD to DataFrame**
#### *To convert an RDD to DataFrame, you need to define the schema*

In [10]:
rdd = spark.sparkContext.parallelize([(1,"Alice"),(2,"Bob")])
columns = ["id", "name"]
df_from_rdd = rdd.toDF(columns)
df_from_rdd.show()

+---+-----+
| id| name|
+---+-----+
|  1|Alice|
|  2|  Bob|
+---+-----+



# **3. Convert DataFrame to Pandas**
#### *You can convert a PySpark DataFrame to a Pandas DataFrame using toPandas()*


In [11]:
pandas_df = df_from_rdd.toPandas()
print(pandas_df)

   id   name
0   1  Alice
1   2    Bob


# **4. show()**
#### *The show() method displays the first n rows of a DataFrame*

In [12]:
df_from_rdd.show(5) #display first 5 rows

+---+-----+
| id| name|
+---+-----+
|  1|Alice|
|  2|  Bob|
+---+-----+



# **5. StructType & StructField**
#### *These classes are used to define the schema for DataFrames.*

In [13]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [14]:
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name",StringType(), True)
])

data_required = [(1,"Alice"), (2,"Bob")]
df_with_schema = spark.createDataFrame(data_required, schema)
df_with_schema.show()

+---+-----+
| id| name|
+---+-----+
|  1|Alice|
|  2|  Bob|
+---+-----+



# **6. Column Class**
#### *The column class represents a column in a DataFrame and is used for performing operations.*

In [15]:
from pyspark.sql import functions as F
df_with_column = df_from_rdd.withColumn("upper_name", F.upper("name"))
df_with_column.show()

+---+-----+----------+
| id| name|upper_name|
+---+-----+----------+
|  1|Alice|     ALICE|
|  2|  Bob|       BOB|
+---+-----+----------+



# **7. select**
#### *The select() method is used to select specific columns.*

In [16]:
df_from_rdd.select("id").show() #selected only id column

+---+
| id|
+---+
|  1|
|  2|
+---+



# **8. collect**
#### *collect() returns all the rows as a list of Row objects.*

In [17]:
rows = df_from_rdd.collect()
print(rows)

[Row(id=1, name='Alice'), Row(id=2, name='Bob')]


# 9. **withColumn()**
#### *This method is used to add or modify a column*

In [18]:
df_with_new_col = df_from_rdd.withColumn("id_squared", F.col("id")**2)
df_with_new_col.show()

+---+-----+----------+
| id| name|id_squared|
+---+-----+----------+
|  1|Alice|       1.0|
|  2|  Bob|       4.0|
+---+-----+----------+



# 10. **withColumnRenamed()**
#### *Renames an existing column in the DataFrame*

In [19]:
df_renamed = df_from_rdd.withColumnRenamed("name", "full_name")
df_renamed.show()

+---+---------+
| id|full_name|
+---+---------+
|  1|    Alice|
|  2|      Bob|
+---+---------+



# **11.where() & filter()**
#### *Both methods are used to filter rows based on conditions.*

In [21]:
df_filtered = df_from_rdd.where(F.col("id")>1)
df_filtered.show()

#alternatively, use filter()
df_filtered = df_from_rdd.filter(F.col("id")>1)
df_filtered.show()

+---+----+
| id|name|
+---+----+
|  2| Bob|
+---+----+

+---+----+
| id|name|
+---+----+
|  2| Bob|
+---+----+



# **12. drop() & dropDuplicates()**
#### *Used to drop a column or remove duplicate rows*

In [22]:
#Dropping a column
df_dropped = df_from_rdd.drop("name")
df_dropped.show()

#Removing duplicates
df_no_duplicates = df_from_rdd.dropDuplicates()
df_no_duplicates.show()

+---+
| id|
+---+
|  1|
|  2|
+---+

+---+-----+
| id| name|
+---+-----+
|  2|  Bob|
|  1|Alice|
+---+-----+



# **13. orderBy() and sort()**
#### *These methods are used for sorting data in DataFrame*

In [23]:
df_sorted = df_from_rdd.orderBy("id", ascending=False)
df_sorted.show()

#Equivalent to orderBy()
df_sorted2 = df_from_rdd.sort("id")
df_sorted2.show()

+---+-----+
| id| name|
+---+-----+
|  2|  Bob|
|  1|Alice|
+---+-----+

+---+-----+
| id| name|
+---+-----+
|  1|Alice|
|  2|  Bob|
+---+-----+



# 14. **groupBy()**
#### *Used for group-by operations*

In [24]:
df_grouped= df_from_rdd.groupBy("id").count()
df_grouped.show()

+---+-----+
| id|count|
+---+-----+
|  1|    1|
|  2|    1|
+---+-----+



# **15. join()**
#### *Used for joining DataFrames.*

In [25]:
df2= spark.createDataFrame(
    data= [(1,"Math"),(2,"Science")],
    schema=["id","name"]
)
df_joined = df_from_rdd.join(df2, on="id")
df_joined.show()

+---+-----+-------+
| id| name|   name|
+---+-----+-------+
|  1|Alice|   Math|
|  2|  Bob|Science|
+---+-----+-------+



# 16. **union() & unionAll()**
#### *Both methods combine DataFrames, but unionAll is deprecated in favor of union()*

In [29]:
df3= spark.createDataFrame(
    data= [(3,"Charlie")],
    schema=["id","name"]
)
df_union = df_from_rdd.union(df3)
df_union.show()


+---+-------+
| id|   name|
+---+-------+
|  1|  Alice|
|  2|    Bob|
|  3|Charlie|
+---+-------+



# **17. unionByName()**
#### *Union DataFrames by column name*

In [30]:
df_union_by_name = df_from_rdd.unionByName(df2)
df_union_by_name.show()

+---+-------+
| id|   name|
+---+-------+
|  1|  Alice|
|  2|    Bob|
|  1|   Math|
|  2|Science|
+---+-------+



# **18. UDF (User Defined Function)**
#### *UDFs are used to extend the functionality of Spark DataFrame with custom logic*

In [31]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def add_exclamation(name):
  return name + "!"

add_udf = udf(add_exclamation, StringType())
df_udf = df_from_rdd.withColumn("excited_name", add_udf("name"))
df_udf.show()

+---+-----+------------+
| id| name|excited_name|
+---+-----+------------+
|  1|Alice|      Alice!|
|  2|  Bob|        Bob!|
+---+-----+------------+



# **19. transform()**
#### *transform() is used to apply transformations to a DataFrame*

In [33]:
df_transformed = df_from_rdd.transform(lambda df: df.withColumn("id_squared", df["id"]**2))
df_transformed.show()

+---+-----+----------+
| id| name|id_squared|
+---+-----+----------+
|  1|Alice|       1.0|
|  2|  Bob|       4.0|
+---+-----+----------+



# 20. apply()
#### transform() is used to apply transformations to a DataFrame.

In [35]:
df_applied = df_from_rdd.rdd.map(lambda row: (row.id*2, row.name)).toDF(["id","name"])
df_applied.show()

+---+-----+
| id| name|
+---+-----+
|  2|Alice|
|  4|  Bob|
+---+-----+



# **21. map()**
#### *map() is used on an RDD to apply a function on each element*

In [36]:
rdd_mapped = df_from_rdd.rdd.map(lambda x: (x.id*2, x.name))
df_mapped = rdd_mapped.toDF(["id","name"])
df_mapped.show()

+---+-----+
| id| name|
+---+-----+
|  2|Alice|
|  4|  Bob|
+---+-----+



# **22. flatMap()**
#### *Used to flaten a collection on items*

In [56]:
rdd_flat = df_from_rdd.rdd.flatMap(lambda x: [(x.id, x.name), (x.id*10, x.name)])
print(rdd_flat.collect())
df_flat = rdd_flat.toDF(["id","name"])
df_flat.show()

[(1, 'Alice'), (10, 'Alice'), (2, 'Bob'), (20, 'Bob')]
+---+-----+
| id| name|
+---+-----+
|  1|Alice|
| 10|Alice|
|  2|  Bob|
| 20|  Bob|
+---+-----+



In [55]:
# Example list of sentences
sentences = [
    "Hello world",
    "This is Spark",
    "flatMap example",
    "Filter me out"
]
rdd2= spark.sparkContext.parallelize(sentences)
rdd_filtered  = rdd2.flatMap(lambda sentence: [(word,) for word in sentence.split(" ") if word!="me"])
df_required_filtered= rdd_filtered.toDF(["word"])
df_required_filtered.show()

+-------+
|   word|
+-------+
|  Hello|
|  world|
|   This|
|     is|
|  Spark|
|flatMap|
|example|
| Filter|
|    out|
+-------+



In [43]:
rdd2.collect()

['Hello world', 'This is Spark', 'flatMap example', 'Filter me out']

In [51]:
rdd_filtered  = rdd2.flatMap(lambda sentence: [(word,) for word in sentence.split(" ") if word!="me"])

In [52]:
rdd_filtered.collect()

[('Hello',),
 ('world',),
 ('This',),
 ('is',),
 ('Spark',),
 ('flatMap',),
 ('example',),
 ('Filter',),
 ('out',)]

In [54]:
df_required_filtered= rdd_filtered.toDF(["word"])
df_required_filtered.show()

+-------+
|   word|
+-------+
|  Hello|
|  world|
|   This|
|     is|
|  Spark|
|flatMap|
|example|
| Filter|
|    out|
+-------+



# **23. foreach()**
#### *foreach() is used for applying a function to each row in the DataFrame.*

In [71]:
def print_row(row):
  print(row)
df_from_rdd.foreach(print_row)


# **24. sample() vs sampleBy()**
#### *sample() is used for random.sampling, while sampleBy() allows sampling with satisfaction*

In [76]:
#sample()
df_sample = df_from_rdd.sample(fraction=0.5)
df_sample.show()

#sampleBy()
df_sample_by = df_from_rdd.sampleBy("id", fractions={1:0.5,2:0.5})
df_sample_by.show()

+---+-----+
| id| name|
+---+-----+
|  1|Alice|
|  2|  Bob|
+---+-----+

+---+----+
| id|name|
+---+----+
|  2| Bob|
+---+----+



# **25. fillna()**
*Used for handling missing values.*

In [83]:
#fillna()
df_null = spark.createDataFrame(
    data=[(1, None), (2, "sanjeev2"),(None, None), (None, "sanjeev4")],
    schema= ["id","name"]
)
df_fillna = df_null.fillna({"id":0, "name":"Unknown"})
df_fillna.show()

+---+--------+
| id|    name|
+---+--------+
|  1| Unknown|
|  2|sanjeev2|
|  0| Unknown|
|  0|sanjeev4|
+---+--------+



# **26. pivot() (Row to Column)**
#### *Used to pivot data (convert rows to columns).*

In [84]:
df_pivoted = df_from_rdd.groupBy("id").pivot("name").agg({"id":"count"})
df_pivoted.show()

+---+-----+----+
| id|Alice| Bob|
+---+-----+----+
|  1|    1|NULL|
|  2| NULL|   1|
+---+-----+----+



# **27. repartitionByRange() and repartition()**
#### *Re-Partitioning the data by one or more columns for distributed processing.*

In [95]:
df_for_partitioning = spark.createDataFrame(
    data=[(1, "A"), (2, "B"),(3, "C"), (4, "D"), (5, "E"), (6, "F")],
    schema= ["id","name"]
)

#repartitionByRange
df_repartitionByRange = df_for_partitioning.repartitionByRange(2,"id")
print(df_repartitionByRange.rdd.getNumPartitions())
df_repartitionByRange.withColumn("partition_id", F.spark_partition_id()).show()

#repartition
df_repartition = df_for_partitioning.repartition(2,"id")
print(df_repartition.rdd.getNumPartitions())
df_repartition.withColumn("partition_id", F.spark_partition_id()).show()

2
+---+----+------------+
| id|name|partition_id|
+---+----+------------+
|  1|   A|           0|
|  2|   B|           0|
|  3|   C|           0|
|  4|   D|           1|
|  5|   E|           1|
|  6|   F|           1|
+---+----+------------+

2
+---+----+------------+
| id|name|partition_id|
+---+----+------------+
|  2|   B|           0|
|  4|   D|           0|
|  5|   E|           0|
|  1|   A|           1|
|  3|   C|           1|
|  6|   F|           1|
+---+----+------------+



# 28. **MapType (Map / Dict)**
#### *MapType is used for columns that represent key-value pairs*

In [96]:
from pyspark.sql.types import MapType, StringType

data = [(1, {"name": "Alice","age":"25"}), (2,{"name":"Bob","age":"30"})]
schema = StructType([
    StructField("id",IntegerType(),True),
    StructField("info", MapType(StringType(), StringType()), True)
])
df_map = spark.createDataFrame(data, schema)
df_map.show()

+---+--------------------+
| id|                info|
+---+--------------------+
|  1|{name -> Alice, a...|
|  2|{name -> Bob, age...|
+---+--------------------+

