<a href="https://colab.research.google.com/github/smruthyunjaya05/MLE/blob/main/pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, min, max, expr

# Create Spark session
spark = SparkSession.builder.appName("PercentilesExample").getOrCreate()

# Example DataFrame
data = [(10,), (20,), (15,), (30,), (45,), (50,), (60,), (70,), (80,), (90,)]
df = spark.createDataFrame(data, ["values"])

# Min and Max
df.select(min("values"), max("values")).show()

# Percentiles (25, 50, 75)
percentiles = df.approxQuantile("values", [0.25, 0.5, 0.75], 0.0)
print("25th:", percentiles[0])
print("Median:", percentiles[1])
print("75th:", percentiles[2])


In [None]:
from pyspark.sql import functions as F

# Example DataFrame
data = [("Delhi",), ("Mumbai",), ("Delhi",), ("Chennai",), ("Kolkata",),
        ("Delhi",), ("Mumbai",), ("Bangalore",), ("Chennai",), ("Delhi",)]
df = spark.createDataFrame(data, ["city"])

# Find top 2 frequent values
top2 = [row["city"] for row in df.groupBy("city").count().orderBy(F.desc("count")).limit(2).collect()]

# Replace others with "Other"
df = df.withColumn("city", F.when(F.col("city").isin(top2), F.col("city")).otherwise("Other"))

df.show()


In [None]:
data = [("A", 23, "Delhi"),
        ("B", None, "Mumbai"),
        ("C", 35, None),
        ("D", None, "Chennai")]
df = spark.createDataFrame(data, ["name", "age", "city"])

print("Before dropping:")
df.show()

# Drop rows where "age" is NULL
df_clean = df.na.drop(subset=["age"])

print("After dropping:")
df_clean.show()


In [None]:
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder.appName("RenameColumns").getOrCreate()

# Example DataFrame
data = [(1, "Alice", 25), (2, "Bob", 30)]
df = spark.createDataFrame(data, ["id", "name", "age"])

print("Before rename:")
df.show()

# Old and new column names
old_cols = ["id", "name", "age"]
new_cols = ["user_id", "full_name", "user_age"]

# Rename using reduce (or simple loop)
for old, new in zip(old_cols, new_cols):
    df = df.withColumnRenamed(old, new)

print("After rename:")
df.show()


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Create Spark session
spark = SparkSession.builder.appName("ExtractItems").getOrCreate()

# Example DataFrame with an array column
data = [
    (1, ["a", "b", "c", "d"]),
    (2, ["x", "y", "z"]),
    (3, ["p", "q"])
]
df = spark.createDataFrame(data, ["id", "items"])

print("Original DataFrame:")
df.show(truncate=False)

# Extract items at given positions (0-based indexing)
df = df.withColumn("first_item", F.col("items")[0]) \
       .withColumn("second_item", F.col("items")[1])

print("After extracting positions:")
df.show(truncate=False)


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Create Spark session
spark = SparkSession.builder.appName("MSEExample").getOrCreate()

# Example DataFrame with truth and prediction columns
data = [
    (1.0, 1.2),
    (2.0, 1.8),
    (3.0, 2.5),
    (4.0, 4.2)
]
df = spark.createDataFrame(data, ["truth", "prediction"])

# Compute Mean Squared Error
mse_df = df.select(F.mean((F.col("truth") - F.col("prediction"))**2).alias("mse"))

mse_df.show()


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Create Spark session
spark = SparkSession.builder.appName("SecondOrderDiff").getOrCreate()

# Example DataFrame
data = [(1,), (4,), (9,), (16,), (25,)]
df = spark.createDataFrame(data, ["value"])

print("Original Data:")
df.show()

# Define window
w = Window.orderBy("value").rowsBetween(-1, -1)

# First-order difference
df = df.withColumn("first_diff", F.col("value") - F.lag("value").over(Window.orderBy("value")))

# Second-order difference (difference of first differences)
df = df.withColumn("second_diff", F.col("first_diff") - F.lag("first_diff").over(Window.orderBy("value")))

print("After computing differences:")
df.show()


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Create Spark session
spark = SparkSession.builder.appName("PivotExample").getOrCreate()

# Example DataFrame
data = [
    ("A", "Jan", 100),
    ("A", "Feb", 150),
    ("B", "Jan", 200),
    ("B", "Feb", 250),
    ("C", "Jan", 300)
]
df = spark.createDataFrame(data, ["name", "month", "sales"])

print("Original Data:")
df.show()
