# First - Create your DataFrame

In [0]:
# read the samples.tpch.orders table into a dataframe
df = spark.table('samples.tpch.orders')

## Basics

In [0]:
# Show a preview
df.display()

In [0]:
# Show preview of first / last n rows
df.head(5)
df.tail(5)

In [0]:
# Limit actual DataFrame to n rows (non-deterministic)
df = df.limit(5)
df.display()

In [0]:
# Get columns
df.columns

In [0]:
# Get columns + column types
df.dtypes

In [0]:
# Get schema
df.schema

In [0]:
# Get row count
df.count()

In [0]:
# Get column count
len(df.columns)

In [0]:
# Write output csv to volume
df.write.csv('/path/to/your/output/volume')

In [0]:
# Get results (WARNING: in-memory) as list of PySpark Rows (only limit this to 5 rows)
df_list = df.limit(5).collect()

print(df_list)

# Common Patterns

###Importing Functions & Types

In [0]:
# Easily reference these as F.my_function() and T.my_type() below
from pyspark.sql import functions as F, types as T

### Filtering

In [0]:
# Filter on equals condition
df = df.filter(df.o_orderstatus == 'F')

In [0]:
# Filter on >, <, >=, <= condition
df = df.filter(df.o_totalprice > 100000)

In [0]:
# Multiple conditions require parentheses around each condition
df = df.filter((df.o_totalprice > 100000) & (df.o_orderstatus == 'F'))

In [0]:
# Filter orders where order priority is one of these values
df = df.filter(col('o_orderpriority').isin(['1-URGENT', '2-HIGH', '3-MEDIUM']))

In [0]:
# Sort results
df = df.orderBy(df.orderdate.asc()))
df = df.orderBy(df.orderdate.desc()))

### Joins

In [0]:
# Left join orders with customers on customer key
customer_df = spark.table('samples.tpch.customer')
df = df.join(customer_df, df.o_custkey == customer_df.c_custkey, 'left')

In [0]:
# Match orders.o_custkey to customer.c_custkey (explicit column match)
customer_df = spark.table('samples.tpch.customer')
df = df.join(customer_df, df.o_custkey == customer_df.c_custkey, 'left')

In [0]:
# Join orders and lineitem on multiple columns (example)
lineitem_df = spark.table('samples.tpch.lineitem')
df = df.join(lineitem_df, (df.o_orderkey == lineitem_df.l_orderkey) & (df.o_orderdate == lineitem_df.l_shipdate), 'left')

### Column Operations

In [0]:
# Add a new static column indicating review status
df = df.withColumn('review_status', F.lit('REVIEWED'))

In [0]:
# Construct a new dynamic column combining order priority and status
df = df.withColumn('order_summary', F.when(
    (df.o_orderpriority.isNotNull() & df.o_orderstatus.isNotNull()),
    F.concat(df.o_orderpriority, F.lit(' - '), df.o_orderstatus)
).otherwise(F.lit('N/A')))

In [0]:
# Pick which columns to keep, optionally rename some
df = df.select(
    'o_orderkey',
    'o_custkey',
    F.col('o_orderdate').alias('order_date')
)

In [0]:
# Drop columns that are not needed
df = df.drop('o_clerk', 'o_comment')

In [0]:
# Rename a column
new_df = df.withColumnRenamed('o_orderdate', 'order_date')

# Display the resulting DataFrame
new_df.display()

In [0]:
# Keep only the columns that also exist in customer_df
new_df = df.select(*(F.col(c) for c in customer_df.columns if c in df.columns))

# Display the resulting DataFrame
new_df.display()

In [0]:
# Batch Rename/Clean Columns
new_df = df
for col_name in df.columns:
    new_col_name = col_name.lower().replace(' ', '_').replace('-', '_')
    new_df = new_df.withColumnRenamed(col_name, new_col_name)

# Display the resulting DataFrame
new_df.display()

### Casting & Coalescing Null Values & Duplicates

In [0]:
# Cast o_totalprice to DoubleType
new_df = df.withColumn('o_totalprice', df.o_totalprice.cast(T.DoubleType()))

# Display the resulting DataFrame
new_df.display()

In [0]:
# Replace nulls in specific columns
new_df = df.fillna({
    'o_clerk': 'UNKNOWN',
    'o_totalprice': 0,
})

# Display the resulting DataFrame
new_df.display()

In [0]:
# Take the first non-null value among o_clerk, o_comment, or fallback to 'N/A'
new_df = df.withColumn('first_nonnull_field', F.coalesce(df.o_clerk, df.o_comment, F.lit('N/A')))

# Display the resulting DataFrame
new_df.display()

In [0]:
# Drop duplicate rows in a dataset (distinct)
new_df = df.dropDuplicates() # or
new_df = df.distinct()

# Display the resulting DataFrame
new_df.display()

In [0]:
# Drop duplicate rows based on order date and customer key
new_df = df.dropDuplicates(['o_orderdate', 'o_custkey'])

# Display the resulting DataFrame
new_df.display()

### String Operations

In [0]:
# Filter rows where o_clerk contains the letter 'o'
new_df = df.filter(df.o_clerk.contains('o'))

# Display the resulting DataFrame
new_df.display()

In [0]:
# Filter rows where o_clerk starts with 'Al'
new_df = df.filter(df.o_orderpriority.startswith('1-U'))

# Display the resulting DataFrame
new_df.display()

In [0]:
# Filter rows where o_clerk ends with 'ice'
new_df = df.filter(df.o_clerk.endswith('39'))

# Display the resulting DataFrame
new_df.display()

In [0]:
# Filter rows where o_clerk is null
new_df = df.filter(df.o_clerk.isNull())

# Display the resulting DataFrame
new_df.display()

In [0]:
# Filter rows where o_comment is not null
new_df = df.filter(df.o_comment.isNotNull())

# Display the resulting DataFrame
new_df.display()

In [0]:
# Filter rows where o_clerk starts with 'Al' using SQL LIKE pattern
new_df = df.filter(df.o_clerk.like('Al%'))

# Display the resulting DataFrame
new_df.display()

In [0]:
# Filter rows where o_clerk ends with 'ICE' using regex
new_df = df.filter(df.o_clerk.rlike('[A-Z]*ICE$'))

# Display the resulting DataFrame
new_df.display()

In [0]:
# Filter rows where o_clerk is in the given list
new_df = df.filter(df.o_clerk.isin('Clerk#000000001', 'Clerk#000000002'))

# Display the resulting DataFrame
new_df.display()

### String Functions

In [0]:
# Extract first 3 characters of o_orderpriority as short_priority
new_df = df.withColumn('short_priority', df.o_orderpriority.substr(0, 3))

# Display the resulting DataFrame
new_df.display()

In [0]:
# Trim whitespace from o_clerk
new_df = df.withColumn('o_clerk', F.trim(df.o_clerk))

# Display the resulting DataFrame
new_df.display()

In [0]:
# Left Pad - F.lpad(col, len, pad)
# Right Pad - F.rpad(col, len, pad)
new_df = df.withColumn('padded_orderpriority', F.rpad(df.o_orderpriority, 15, '-'))

# Display the resulting DataFrame
new_df.display()

In [0]:
# Left Trim - F.ltrim(col)
# Right Trim - F.rtrim(col)
new_df = df.withColumn('o_clerk', F.ltrim(F.col('o_clerk')))

# Display the resulting DataFrame
new_df.display()

In [0]:
# Concatenate - F.concat(*cols)
new_df = df.withColumn('summary_label', F.concat(F.col('o_orderpriority'), F.lit(' '), F.col('o_orderstatus')))

# Display the resulting DataFrame
new_df.display()

In [0]:
# Concatenate with Separator/Delimiter - F.concat_ws(delimiter, *cols)
new_df = df.withColumn('summary_label', F.concat_ws('-', F.col('o_orderpriority'), F.col('o_orderstatus')))

# Display the resulting DataFrame
new_df.display()

In [0]:
# Replace '0F1...' patterns in o_orderstatus with '1F1-...' using regex
new_df = df.withColumn('o_orderstatus', F.regexp_replace(F.col('o_orderstatus'), '0F1(.*)', '1F1-\\1'))

# Display the resulting DataFrame
new_df.display()

### Number Operations

In [0]:
# Round o_totalprice to 0 decimal places
new_df = df.withColumn('o_totalprice', F.round(F.col('o_totalprice'), 0))

# Display the resulting DataFrame
new_df.display()

In [0]:
# Apply floor to o_totalprice
new_df = df.withColumn('o_totalprice', F.floor(F.col('o_totalprice')))

# Display the resulting DataFrame
new_df.display()

In [0]:
# Ceiling - F.ceil(col)
df = df.withColumn('price', F.ceil('price'))

In [0]:
# Get absolute value of o_totalprice
new_df = df.withColumn('o_totalprice', F.abs(F.col('o_totalprice')))

# Display the resulting DataFrame
new_df.display()

### Date & Timestamp Operations

In [0]:
# Add a column with the current date
new_df = df.withColumn('current_date', F.current_date())

# Display the resulting DataFrame
new_df.display()

In [0]:
# Convert o_orderdate from string to date
new_df = df.withColumn('o_orderdate', F.to_date(F.col('o_orderdate'), 'yyyy-MM-dd'))

# Display the resulting DataFrame
new_df.display()

In [0]:
# Convert o_orderdate string to timestamp
new_df = df.withColumn('o_orderdate', F.to_timestamp(F.col('o_orderdate'), 'yyyy-MM-dd HH:mm:ss'))

# Display the resulting DataFrame
new_df.display()

In [0]:
# Get year from date:       F.year(col)
# Get month from date:      F.month(col)
# Get day from date:        F.dayofmonth(col)
# Get hour from date:       F.hour(col)
# Get minute from date:     F.minute(col)
# Get second from date:     F.second(col)
# Filter rows where the order year is 1996
new_df = df.filter(F.year(F.col('o_orderdate')) == F.lit(1996))

# Display the resulting DataFrame
new_df.display()

In [0]:
# Add and subtract 3 days from o_orderdate
new_df = df.withColumn('three_days_after', F.date_add(F.col('o_orderdate'), 3)) \
           .withColumn('three_days_before', F.date_sub(F.col('o_orderdate'), 3))

# Display the resulting DataFrame
new_df.display()

In [0]:
# Add one month to o_orderdate
new_df = df.withColumn('next_month', F.add_months(F.col('o_orderdate'), 1))

# Display the resulting DataFrame
new_df.display()

In [0]:
# Assuming df is a joined orders + lineitem DataFrame
new_df = df.withColumn('days_between', F.datediff(F.col('l_shipdate'), F.col('o_orderdate')))

# Display the resulting DataFrame
new_df.display()

In [0]:
# Assuming df is a joined orders + lineitem DataFrame
new_df = df.withColumn('months_between', F.months_between(F.col('l_shipdate'), F.col('o_orderdate')))

# Display the resulting DataFrame
new_df.display()

In [0]:
# Filter orders where o_orderdate is between two dates
new_df = df.filter(
    (F.col('o_orderdate') >= F.lit('1995-01-01')) &
    (F.col('o_orderdate') <= F.lit('1996-12-31'))
)

# Display the resulting DataFrame
new_df.display()

### Aggregation Operations

In [0]:
# Row Count:                F.count()
# Sum of Rows in Group:     F.sum(*cols)
# Mean of Rows in Group:    F.mean(*cols)
# Max of Rows in Group:     F.max(*cols)
# Min of Rows in Group:     F.min(*cols)
# First Row in Group:       F.alias(*cols)

# Group by order priority and compute max total price
new_df = df.groupBy('o_orderpriority').agg(F.max('o_totalprice').alias('max_price_by_priority'))

# Display the resulting DataFrame
new_df.display()

In [0]:
# Collect a Set of all Rows in Group:       F.collect_set(col)
# Collect a List of all Rows in Group:      F.collect_list(col)
# Group by order priority and collect unique clerk names into a set
new_df = df.groupBy('o_orderpriority').agg(F.collect_set('o_clerk').alias('clerks_per_priority'))

# Display the resulting DataFrame
new_df.display()

In [0]:
# Just take the lastest row for each combination (Window Functions)
from pyspark.sql import Window as W

# Define a window partitioned by customer and order status, ordered by latest order date
window = W.partitionBy("o_custkey", "o_orderstatus").orderBy(F.desc("o_orderdate"))

# Add row_number, filter for latest per group
new_df = df.withColumn("row_number", F.row_number().over(window)) \
           .filter(F.col("row_number") == 1)

# Display the resulting DataFrame
new_df.display()