# Pyspark Dataframe

## Initialize

In [None]:
from pyspark.sql import SparkSession

# Create a Spark Session
spark = SparkSession.builder \
    .appName("YourAppName") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()


## Dataframe Operations

In [None]:
from pyspark.sql import Row

# From a list of rows
data = [Row(name="Alice", age=25), Row(name="Bob", age=30)]
df = spark.createDataFrame(data)

# From RDD
rdd = spark.sparkContext.parallelize(data)
df_from_rdd = spark.createDataFrame(rdd)

# From CSV/JSON/Parquet files
csv_df = spark.read.csv("file_path", header=True, inferSchema=True)
json_df = spark.read.json("file_path")
parquet_df = spark.read.parquet("file_path")


### DF Operations

In [None]:
# Show data
df.show()

# Schema and data types
df.printSchema()
df.dtypes

# Select columns
df.select("name", "age").show()

# Filter rows
df.filter(df.age > 25).show()

# Add or modify columns
df.withColumn("new_col", df.age + 5).show()

# Drop a column
df.drop("age").show()

# Grouping and Aggregation
df.groupBy("name").agg({"age": "mean"}).show()

# Sorting
df.orderBy(df.age.desc()).show()


### Writing DF's

In [None]:
# Save as CSV
df.write.csv("output_path", header=True)

# Save as JSON
df.write.json("output_path")

# Save as Parquet
df.write.parquet("output_path")


## RDD Operations

In [None]:
# Create an RDD
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])

# Transformations
mapped_rdd = rdd.map(lambda x: x * 2)
filtered_rdd = rdd.filter(lambda x: x > 2)

# Actions
collected = rdd.collect()
count = rdd.count()
first_element = rdd.first()

# Aggregate
sum_all = rdd.reduce(lambda x, y: x + y)


## SQL Queries

In [None]:
# Register DataFrame as a temporary view
df.createOrReplaceTempView("people")

# Run SQL queries
result_df = spark.sql("SELECT name, age FROM people WHERE age > 25")
result_df.show()


## ML Functions

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

# Prepare data
assembler = VectorAssembler(inputCols=["feature1", "feature2"], outputCol="features")
data = assembler.transform(df).select("features", "label")

# Train-test split
train, test = data.randomSplit([0.8, 0.2])

# Linear Regression model
lr = LinearRegression(featuresCol="features", labelCol="label")
model = lr.fit(train)

# Predictions
predictions = model.transform(test)
predictions.show()


## Streaming

In [None]:
from pyspark.sql.types import StructType

schema = StructType().add("name", "string").add("age", "integer")

# Read from a stream source
stream_df = spark.readStream \
    .schema(schema) \
    .csv("streaming_input_path")

# Process and write stream
query = stream_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()


## Config's

In [None]:
# Set Spark Configurations
spark.conf.set("spark.sql.shuffle.partitions", 10)


## UDF's(User Defined)

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

# Define a UDF
def square(x):
    return x * x

square_udf = udf(square, IntegerType())

# Apply UDF
df.withColumn("squared", square_udf(df.age)).show()


## SQL Functions

In [None]:
from pyspark.sql.functions import col, lit, when, count, sum, avg

df.select(col("age") + 1).show()
df.withColumn("is_adult", when(col("age") >= 18, lit("Yes")).otherwise(lit("No"))).show()
df.groupBy("name").agg(count("*").alias("count"), sum("age").alias("total_age")).show()


### Broadcast

In [None]:
broadcasted_var = spark.sparkContext.broadcast([1, 2, 3])


###  Accumulator

In [None]:
accumulator = spark.sparkContext.accumulator(0)


### Checkpoint

In [None]:
spark.sparkContext.setCheckpointDir("checkpoint_dir")
rdd.checkpoint()


# PySpark complex data manipulation

In [None]:
# Import required modules
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lit, avg, count, max

# Step 1: Initialize Spark Session
# Create a Spark session which acts as the entry point for using PySpark
spark = SparkSession.builder \
    .appName("Data Manipulation Example") \
    .getOrCreate()

# Step 2: Load Data
# Create a sample DataFrame for demonstration
data = [
    ("Alice", 25, "F", 5000),
    ("Bob", 30, "M", 6000),
    ("Charlie", 35, "M", 7000),
    ("Diana", 40, "F", 8000),
    ("Eve", 45, "F", 9000),
]
columns = ["Name", "Age", "Gender", "Salary"]

# Convert the Python list into a PySpark DataFrame
df = spark.createDataFrame(data, schema=columns)

# Display the data
print("Original Data:")
df.show()

# Step 3: Transformations

# 3.1: Add a new column with conditional logic
# Add a column "Age Group" based on the age value
df = df.withColumn(
    "Age Group", 
    when(col("Age") < 30, "Young")
    .when(col("Age").between(30, 40), "Middle-aged")
    .otherwise("Senior")
)

# 3.2: Filter rows
# Keep only rows where Salary > 6000
filtered_df = df.filter(col("Salary") > 6000)

# 3.3: Aggregate data
# Group by Gender and calculate average Salary and maximum Age
aggregated_df = df.groupBy("Gender").agg(
    avg("Salary").alias("Avg Salary"),
    max("Age").alias("Max Age")
)

# 3.4: Rename a column
# Rename the column "Salary" to "Income"
renamed_df = df.withColumnRenamed("Salary", "Income")

# 3.5: Drop a column
# Drop the "Age Group" column
dropped_df = df.drop("Age Group")

# Step 4: Save the manipulated data
# Write the filtered DataFrame to a CSV file (example path)
# Make sure the output directory is writable
filtered_df.write.csv("output/filtered_data", header=True, mode="overwrite")

# Step 5: Display Results

print("Data with Age Group column added:")
df.show()

print("Filtered Data (Salary > 6000):")
filtered_df.show()

print("Aggregated Data (Average Salary and Max Age by Gender):")
aggregated_df.show()

print("Data with Renamed Column (Salary -> Income):")
renamed_df.show()

print("Data with Age Group Column Dropped:")
dropped_df.show()

# Stop the Spark session
spark.stop()


## Joins

In [None]:
# Inner Join
inner_join_df = df1.join(df2, on="Name", how="inner")

# Left Outer Join
left_join_df = df1.join(df2, on="Name", how="left")

# Full Outer Join
full_join_df = df1.join(df2, on="Name", how="outer")

# Display results
print("Inner Join Result:")
inner_join_df.show()

print("Left Outer Join Result:")
left_join_df.show()

print("Full Outer Join Result:")
full_join_df.show()

## Window Functions

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, dense_rank, row_number

# Sample DataFrame
data = [("Alice", "HR", 5000), ("Bob", "IT", 6000), 
        ("Charlie", "HR", 4500), ("Diana", "IT", 7000),
        ("Eve", "HR", 5500)]
columns = ["Name", "Department", "Salary"]

df = spark.createDataFrame(data, columns)

# Define a Window Specification
window_spec = Window.partitionBy("Department").orderBy("Salary")

# Apply Window Functions
df = df.withColumn("Rank", rank().over(window_spec))
df = df.withColumn("Dense Rank", dense_rank().over(window_spec))
df = df.withColumn("Row Number", row_number().over(window_spec))

# Display results
print("Data with Window Functions:")
df.show()


## Pivoting Data

In [None]:
# Sample DataFrame
data = [("Alice", "HR", 5000), ("Bob", "IT", 6000), 
        ("Charlie", "HR", 4500), ("Diana", "IT", 7000),
        ("Eve", "HR", 5500)]
columns = ["Name", "Department", "Salary"]

df = spark.createDataFrame(data, columns)

# Pivot the DataFrame
pivot_df = df.groupBy("Department").pivot("Name").sum("Salary")

# Display results
print("Pivoted DataFrame:")
pivot_df.show()


## Dealing with Nulls

In [None]:
# Sample DataFrame
data = [("Alice", None, 5000), ("Bob", "IT", None), 
        ("Charlie", "HR", 4500), (None, "IT", 7000)]
columns = ["Name", "Department", "Salary"]

df = spark.createDataFrame(data, columns)

# Fill null values
df_filled = df.fillna({"Name": "Unknown", "Salary": 0})

# Drop rows with null values
df_dropped = df.dropna()

# Replace specific null values
df_replaced = df.na.replace({"IT": "Information Technology"})

# Display results
print("DataFrame with Nulls Filled:")
df_filled.show()

print("DataFrame with Nulls Dropped:")
df_dropped.show()

print("DataFrame with Replaced Values:")
df_replaced.show()


## Explode Array

In [None]:
from pyspark.sql.functions import explode

# Sample DataFrame with arrays
data = [("Alice", [5000, 6000]), ("Bob", [7000, 8000]), ("Charlie", [])]
columns = ["Name", "Salaries"]

df = spark.createDataFrame(data, columns)

# Explode array column into multiple rows
exploded_df = df.withColumn("Salary", explode(col("Salaries")))

# Display results
print("Exploded DataFrame:")
exploded_df.show()


## Union

In [None]:
# Sample DataFrames
data1 = [("Alice", 25), ("Bob", 30)]
data2 = [("Charlie", 35), ("Diana", 40)]

df1 = spark.createDataFrame(data1, ["Name", "Age"])
df2 = spark.createDataFrame(data2, ["Name", "Age"])

# Union of DataFrames
union_df = df1.union(df2)

# Display results
print("Union of DataFrames:")
union_df.show()


## Sort

In [None]:
# Sample DataFrame
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
columns = ["Name", "Age"]

df = spark.createDataFrame(data, columns)

# Sort by a column
sorted_df = df.orderBy("Age", ascending=False)

# Display results
print("Sorted DataFrame:")
sorted_df.show()


## Duplicates

In [None]:
# Sample DataFrame
data = [("Alice", 25), ("Bob", 30), ("Alice", 25), ("Charlie", 35)]
columns = ["Name", "Age"]

df = spark.createDataFrame(data, columns)

# Drop duplicate rows
df_no_duplicates = df.dropDuplicates()

# Drop duplicates based on specific columns
df_no_duplicates_cols = df.dropDuplicates(["Name"])

# Display results
print("DataFrame without Duplicates:")
df_no_duplicates.show()

print("DataFrame without Duplicates (Based on Name):")
df_no_duplicates_cols.show()


## Cross Join

In [None]:
# Sample DataFrames
data1 = [("Alice",), ("Bob",)]
data2 = [("HR",), ("IT",)]

df1 = spark.createDataFrame(data1, ["Name"])
df2 = spark.createDataFrame(data2, ["Department"])

# Cross Join
cross_join_df = df1.crossJoin(df2)

# Display results
print("Cross Join DataFrame:")
cross_join_df.show()


## Sampling

In [None]:
# Sample DataFrame
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35), ("Diana", 40)]
columns = ["Name", "Age"]

df = spark.createDataFrame(data, columns)

# Sample without replacement
sampled_df = df.sample(withReplacement=False, fraction=0.5, seed=42)

# Display results
print("Sampled DataFrame:")
sampled_df.show()
