In [0]:
import pandas as pd

In [0]:
spark.range(10).show()

In [0]:
spark.sql("SELECT 1+4").show()

In [0]:
df = spark.read.csv("/Volumes/workspace/default/kiran_data/titanic.csv", header=True, inferSchema=True)
df.printSchema()

In [0]:
df.show(5)

In [0]:
df.select("PassengerId", "Survived", "Name", "Ticket").show(5)

In [0]:
df.count()

# Filter Rows

In [0]:
df.filter(df["Sex"] == "female").count()

In [0]:
from pyspark.sql.functions import col, lit, when

df.filter(col('Sex')== 'female').show()

In [0]:
df.filter(df["Sex"] != "female").show(5)

In [0]:
df.filter((df.Pclass == 1) | (df.Pclass == 2)).show(5) #or operator

In [0]:
df.filter((df["Sex"] == "female") & (df["Age"] >= 20)).show(5) #& operator

In [0]:
df.filter(df.Age.isNull()).show()

In [0]:
df.filter(df.Age.isin([40, 41, 42])).show()

In [0]:
# Names starting with 'A'
df.filter(df.Name.like("A%")).show(5)


In [0]:
# Names ending with 'y'
df.filter(df.Name.rlike("y$")).show(5)

In [0]:
df.filter(df.Name.rlike("^S")).show(5) #name starts with 'S'

In [0]:
df.filter(df.Name.startswith("C")).show(5) #startswith

In [0]:
df.filter(df.Name.contains("ar")).show(5)

In [0]:
# Age between 20 and 40
df.filter(df.Age.between(20,40)).show(5)


# Filter with conditional column

In [0]:
# Withcolumn

# Add column: AdultFlag
df = df.withColumn("AdultFlag", when(col("Age")>=18, "Adult").otherwise("Minor"))
# Filter adults
df.filter(df.AdultFlag == "Adult").show(5)


#Filter using literal values

In [0]:
# Fare greater than 100
df.filter(col("Fare") > lit(100)).show(5)

✅ Key Points

Use lit() whenever you want to treat a constant as a column in DataFrame operations.

Required in comparisons, .withColumn(), or .when() expressions.

Alternative to lit(): some simple operations allow Python literals directly, but using lit() is safer and explicit, especially for complex transformations.

In [0]:
from pyspark.sql.functions import lower
# Female passengers, ignoring case
df.filter(lower(col("Sex")) == "female").show(5)


In [0]:
# Specific passenger IDs
df.filter(df.PassengerId.isin([1,5,10])).show(5)


# Filter top N using sort + limit

In [0]:
# Top 5 passengers by Fare
df.orderBy(col("Fare").desc()).limit(5).show()

limit(5) Purpose: Creates a new DataFrame with only the first 5 rows.

Effect on DataFrame: Returns a subset DataFrame. Can be stored, transformed, or written to disk.

Use case: You want to actually restrict data for further processing.

In [0]:
df.createOrReplaceTempView("titanic_table") #register dataframe as temporary SQL table
#Temp view: Exists only for the current Spark session; disappears when you close the notebook/session.
spark.sql("SELECT * FROM titanic_table WHERE Sex='female' AND Age>30").show(5)

In [0]:
spark.sql("select * from titanic_table").show(10)

💡 Pro Tip:

.show() is standard PySpark → always works anywhere.

display() is Databricks-only → useful for demos, dashboards, and exploratory analysis.

In [0]:
flightData2011 = spark\
    .read\
    .option("inferSchema", "true")\
    .option("header", "true")\
    .csv("/Volumes/workspace/default/kiran_data/2011-summary.csv")

In [0]:
flightData2011.take(3) #collecting a df

In [0]:
flightData2011.sort("count").explain() # We can call explain on any Data‐
# Frame object to see the DataFrame’s lineage (or how Spark will execute this query):

In [0]:
#By default, when we perform a shuffle, Spark outputs 200 shuffle partitions. Let’s set this value to 5 to reduce the number of the output partitions from the shuffle:
spark.conf.set("spark.sql.shuffle.partitions", "5")

In [0]:
flightData2011.sort("count").take(2)

In [0]:
flightData2011.createOrReplaceTempView("flight_data_2011")

In [0]:
sqlWay = spark.sql("""SELECT DEST_COUNTRY_NAME, count(1)
          from flight_data_2011
          group by DEST_COUNTRY_NAME
          """)

In [0]:
sqlWay.explain()

In [0]:
dataFrameWay = flightData2011\
.groupBy("DEST_COUNTRY_NAME")\
.count()
dataFrameWay.explain()

In [0]:
#maximum flight to and from any given location.
flightData2011.show(10)

In [0]:
spark.sql("""SELECT max(count) from flight_data_2011""").take(1)

In [0]:
df = spark.range(200).toDF("number")

In [0]:
df.select(df["number"] + 30).show()

In [0]:
spark.range(2).collect()

# _Spark_ _Types_

In [0]:
from pyspark.sql.types import *
b = ByteType()

# Enforce Manual Schema"

In [0]:
from pyspark.sql.types import StructField, StructType, StringType, LongType

myManualSchema = StructType([
StructField("DEST_COUNTRY_NAME", StringType(), True),
StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
StructField("count", LongType(), False, metadata={"description": "Number of flights"})
])
df = spark.read.format("csv").option("header", "true").schema(myManualSchema)\
.load("/Volumes/workspace/default/kiran_data/2011-summary.csv")

In [0]:
df.schema['count'].metadata

In [0]:
from pyspark.sql.functions import expr
# expr("(((count + 5) * 200) - 6) < 500")

In [0]:
df.columns

In [0]:
df.first()

In [0]:
# in Python
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType
myManualSchema = StructType([
StructField("some", StringType(), True),
StructField("col", StringType(), True),
StructField("names", LongType(), False)
])
myRow = Row("Hello", None, 1)
myDf = spark.createDataFrame([myRow], myManualSchema)
myDf.show()

In [0]:
#different ways to refer a column
from pyspark.sql.functions import expr, col, column
df.select(
expr("DEST_COUNTRY_NAME"),
col("DEST_COUNTRY_NAME"),
column("DEST_COUNTRY_NAME"))\
.show(4)

In [0]:
df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)

In [0]:
df.select(expr("DEST_COUNTRY_NAME as destination").alias("DEST_COUNTRY_NAME"))\
.show(2)

In [0]:
df.selectExpr("DEST_COUNTRY_NAME as newColumnName", "DEST_COUNTRY_NAME").show(2)

In [0]:
df.createOrReplaceTempView("dfTable")

In [0]:
%sql
SELECT *, (DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry
FROM dfTable
LIMIT 2

In [0]:
#agg
df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(2)

In [0]:
from pyspark.sql.functions import lit
df.select(expr("*"), lit(1).alias("One")).show(2)

In [0]:
%sql
SELECT *, 1 as One FROM dfTable LIMIT 2

# _Adding Columns_

In [0]:
df.withColumn("isGreaterthan100", expr("count > 100")).show(2)

In [0]:
%sql
select *, (count > 100) as isGreaterthan100 from dfTable limit 2

In [0]:
df.withColumn("Destination", expr("DEST_COUNTRY_NAME")).columns

# Rename a col

In [0]:
df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns

In [0]:
dfWithLongColName = df.withColumn(
"This Long Column-Name",
expr("ORIGIN_COUNTRY_NAME"))


In [0]:
dfWithLongColName.selectExpr(
"`This Long Column-Name`",
"`This Long Column-Name` as `new col`")\
.show(2)

In [0]:
dfWithLongColName.createOrReplaceTempView("dfTableLong")

In [0]:
%sql
SELECT `This Long Column-Name`, `This Long Column-Name` as `new col`
FROM dfTableLong LIMIT 2

In [0]:
df.show()

# Drop col

In [0]:
df.drop("ORIGIN_COUNTRY_NAME").columns

In [0]:
dfWithLongColName.drop("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").columns

In [0]:
dfWithLongColName.show(2)

# type cast

In [0]:
df.dtypes

In [0]:
df.withColumn("count", col("count").cast("float")).dtypes

# Filter

In [0]:
df.filter(col("count") > 1).show(2)

In [0]:
df.where(col("count")>1).show(3)


In [0]:
#Spark automatically performs all filtering operations at the same time regardless of the filter ordering. This means that if you want to specify multiple AND filters, just chain them sequentially and let Spark handle the rest:
df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") != "Croatia")\
.show(2)

In [0]:
%sql
SELECT * FROM dfTable WHERE count < 2 AND ORIGIN_COUNTRY_NAME != "Croatia"
LIMIT 2

# Distinct

In [0]:
df.select("DEST_COUNTRY_NAME").distinct().count()

In [0]:
#distinct applies to the combination of both columns together, not each column separately.
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count() 

In [0]:
129+127

In [0]:
%sql
select count(distinct (ORIGIN_COUNTRY_NAME, DEST_COUNTRY_NAME)) from dfTable

# Random Samples

In [0]:
seed = 5
withReplacement = False
fraction = 0.5
df.sample(withReplacement, fraction, seed).show()

In [0]:
df.show(2)

In [0]:
dataFrames = df.randomSplit([0.25, 0.75], seed)
dataFrames[0].count() > dataFrames[1].count()

In [0]:
dataFrames[0].show(2)

In [0]:
dataFrames[1].show(2)

# Union 

In [0]:
df =df.withColumn("count", col("count").cast("long"))

In [0]:
df.dtypes

In [0]:
# in Python
from pyspark.sql import Row
schema = df.schema
newRows = [
Row("New Country", "Other Country", 5),
Row("New Country 2", "Other Country 3", 1)
]
# parallelizedRows = spark.sparkContext.parallelize(newRows)
newDF = spark.createDataFrame(newRows, schema)

# df.union(newDF).where((col("count") == 1) & (col("ORIGIN_COUNTRY_NAME") != "United States")).show()

# in Python
union_df = df.union(newDF)\
.where("count = 1")\
.where(col("ORIGIN_COUNTRY_NAME") != "United States")

In [0]:
union_df.where(col("DEST_COUNTRY_NAME") == "New Country 2").show()

# sorting

In [0]:
union_df.orderBy("DEST_COUNTRY_NAME", "count").show()

In [0]:
union_df.sort("DEST_COUNTRY_NAME").show()

In [0]:
from pyspark.sql.functions import desc, asc
df.orderBy(expr("count desc")).show(2)

In [0]:
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)

In [0]:
%sql
SELECT * FROM dfTable ORDER BY count DESC, DEST_COUNTRY_NAME ASC LIMIT 2

In [0]:
df.orderBy(expr("count desc")).limit(6).show()

# Repartition and Coalesce

_**Repartition will incur a full shuffle of the data, regardless of whether one is necessary.
This means that you should typically only repartition when the future number of partitions
is greater than your current number of partitions or when you are looking to
partition by a set of columns:**_

In [0]:
# df.rdd.getNumPartitions(3)

In [0]:
df.repartition(5)

In [0]:
#If you know that you’re going to be filtering by a certain column often, it can be worth repartitioning based on that column:
df.repartition(col("DEST_COUNTRY_NAME"))
#or
df.repartition(5, col("DEST_COUNTRY_NAME"))
#Creates a new DataFrame with the specified number of partitions.If you pass columns, Spark shuffles the data so that rows with the same column value are likely to end up in the same partition. Rows with the same DEST_COUNTRY_NAME tend to stay together. This is useful for operations like groupBy or joins to minimize shuffling later.

_**Coalesce, on the other hand, will not incur a full shuffle and will try to combine partitions.
This operation will shuffle your data into five partitions based on the destination
country name, and then coalesce them (without a full shuffle): Reduces the number of partitions without a full shuffle.

More efficient than repartition if you only want to decrease partitions. No full shuffle, so it’s faster but can lead to uneven partition sizes. **_

In [0]:
df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)

After coalesce, the partitioning based on DEST_COUNTRY_NAME may no longer be perfectly preserved because coalesce just merges existing partitions.

Typically, you use this pattern if:

- _You shuffle by a column first for some distributed operation_
- _Then reduce partitions for writing to disk efficiently (like 2 output files instead of 5)__

In [0]:
# Shuffle by DEST_COUNTRY_NAME for efficient groupBy
# df2 = df.repartition(5, col("DEST_COUNTRY_NAME"))

# Reduce to 2 partitions before writing
# df2.coalesce(2).write.csv("/path/to/output")


# Collecting Rows to the Driver

Spark maintains the state of the cluster in the
driver. There are times when you’ll want to collect some of your data to the driver in
order to manipulate it on your local machine

In [0]:
collectDF = df.limit(10)
collectDF.take(5) # take works with an Integer count
collectDF.show() # this prints it out nicely
collectDF.show(5, False)
collectDF.collect()

In [0]:
collectDF.show()