
## Setup & Session

    from pyspark.sql import SparkSession

    spark = (SparkSession.builder
            .appName("app")
            .config("spark.sql.shuffle.partitions", "200")
            .getOrCreate())

    sc = spark.sparkContext

## Common Configs

    spark.conf.set("spark.sql.adaptive.enabled", "true")

    spark.conf.get("spark.sql.shuffle.partitions")

## Data I/O (Delta/Parquet/CSV/JSON/SQL)

    # Read
    df = spark.read.format("delta").load("/path")
    df = spark.read.parquet("/path")
    df = spark.read.csv("/path", header=True, inferSchema=True)
    df = spark.read.json("/path")
    df = spark.read.table("catalog.db.table")          # Unity Catalog or hive_metastore
    df = spark.sql("SELECT * FROM catalog.db.table")   # SQL

    # Write (overwrite/append + partitioning)
    (df.write
      .format("delta")
      .mode("overwrite")
      .option("overwriteSchema", "true")
      .partitionBy("dt")
      .save("/path"))

    df.write.saveAsTable("catalog.db.table", format="delta", mode="append")

## DataFrame Basics
    df.printSchema()
    df.show(20, truncate=False)
    df.head(1)                  # collect small
    df.count()                  # action
    df.cache(); df.count()      # materialize
    df.unpersist()
    df.explain(mode="formatted")

- **Selecting & Renaming**

      from pyspark.sql.functions import col, expr

      df.select("a", col("b").alias("b1"), expr("a + b as a_plus_b"))
      df.withColumnRenamed("old", "new")
      df.drop("unwanted_col")

- **Filtering & Ordering**  
      
      df.filter(col("x") > 5)                 # or df.where("x > 5")
      df.orderBy(col("dt").desc(), "id")
      df.dropDuplicates(["key1","key2"])      # distinct on keys

- **Joins**

      df.join(df2, on="id", how="inner")
      df.join(df2, (df.id == df2.id) & (df.dt == df2.dt), "left")
      df.hint("broadcast").join(small_df, "key", "left")

- **Aggregations & Grouping**

      from pyspark.sql.functions import sum as _sum, avg, countDistinct

      df.groupBy("k").agg(_sum("amt").alias("total_amt"), avg("score"))
      df.rollup("year","month").sum("sales")
      df.cube("a","b").count()

- **Columns, Expressions & Functions**

      from pyspark.sql.functions import (
        lit, when, coalesce, greatest, least, isnan, isnull, round, expr
      )

      df.select(when(col("x") > 0, "pos").otherwise("neg").alias("sign"))
      df.select(coalesce("a","b","c").alias("first_non_null"))
      df.select(greatest("a","b","c").alias("maxval"))
      df.filter(isnull("a") | isnan("a"))
      df.select(round(col("price"), 2))
      df.select(expr("case when a>0 then 1 else 0 end").alias("flag"))

- **Strings / Dates / Arrays / Maps**

      from pyspark.sql.functions import (
        upper, lower, trim, regexp_extract, regexp_replace, length, concat_ws,
        to_date, to_timestamp, date_add, date_sub, current_date, datediff,
        split, size, array, explode, map_from_arrays
      )

      # Strings
      df.select(upper("name"), regexp_replace("email", "@.*", "@mask.com"))

      # Dates
      df.select(to_date("dt_str","yyyy-MM-dd").alias("dt"),
                datediff(current_date(), col("dt")).alias("age_days"))

      # Arrays & explode
      df.select(split("csv","\\,").alias("arr")).select(explode("arr").alias("val"))

      # Maps
      df.select(map_from_arrays(col("keys"), col("vals")).alias("m"))

## Window Functions

    from pyspark.sql.window import Window
    from pyspark.sql.functions import row_number, rank, dense_rank, lag, lead, sum as _sum

    w = Window.partitionBy("part").orderBy(col("ts").desc())

    df.select("id","part","ts",
              row_number().over(w).alias("rn"),
              lag("value", 1).over(w).alias("prev_val"),
              _sum("value").over(w.rowsBetween(Window.unboundedPreceding, Window.currentRow)).alias("running_sum"))

## Nulls & Conditionals

    df.fillna({"str_col": "N/A", "num_col": 0})
    df.na.drop(subset=["must_have"])
    df.na.replace({"old": "new"}, subset=["col"])

## UDFs & Pandas UDFs

    from pyspark.sql.types import IntegerType
    from pyspark.sql.functions import udf

    @udf(IntegerType())
    def add_one(x): return None if x is None else x + 1

    df.select(add_one("n").alias("n1"))

- **Pandas UDF (vectorized; faster)**

      import pandas as pd
      import pyspark.sql.functions as F
      from pyspark.sql.types import LongType

      @F.pandas_udf(LongType())
      def add_two_udf(s: pd.Series) -> pd.Series:
          return s + 2

      df.select(add_two_udf("n").alias("n2"))

## Performance Essentials

    df.repartition(200, "key")     # wider shuffle by key
    df.coalesce(10)                # narrow reduce partitions
    small = F.broadcast(small_df)  # force broadcast
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 50*1024*1024)

- Prefer built-in functions over UDFs (Catalyst optimization).
- Use Delta + Z-Ordering for skip scanning; vacuum old files.
- Enable AQE (adaptive query execution).
- Cache only if reused; unpersist when done.

## Structured Streaming (Core Pattern)

    from pyspark.sql.functions import expr

    stream_df = (spark.readStream
                .format("cloudFiles")              # Databricks Autoloader, if used
                .option("cloudFiles.format", "json")
                .load("/input"))

    transformed = stream_df.selectExpr("cast(value as string) as v")

    query = (transformed.writeStream
            .format("delta")
            .option("checkpointLocation", "/chk/stream1")
            .outputMode("append")
            .start("/output"))

    # query.status, query.lastProgress
    # query.stop()
- Databricks preview:

    display(stream_df, streamName="live")

## Spark SQL

    df.createOrReplaceTempView("t")
    spark.sql("""
      SELECT k, SUM(v) AS s
      FROM t
      WHERE dt >= '2025-01-01'
      GROUP BY k
      HAVING s > 100
      ORDER BY s DESC
    """)

## RDD (when you really need it)

    r = sc.parallelize([1,2,3,4])
    r.map(lambda x: x*x).filter(lambda x: x>4).collect()

## MLlib (Pipelines Quickstart)

    from pyspark.ml import Pipeline
    from pyspark.ml.feature import StringIndexer, VectorAssembler
    from pyspark.ml.classification import LogisticRegression

    idx = StringIndexer(inputCol="label_str", outputCol="label")
    vec = VectorAssembler(inputCols=["f1","f2","f3"], outputCol="features")
    lr  = LogisticRegression(featuresCol="features", labelCol="label")

    pipeline = Pipeline(stages=[idx, vec, lr])
    model = pipeline.fit(train_df)
    pred  = model.transform(test_df)

## Databricks-Specific Nice-to-Haves

    display(df)                              # interactive
    dbutils.fs.ls("/mnt/...")                # list files
    dbutils.widgets.text("p_dt","")          # widgets
    dbutils.jobs.taskValues.set(key="k", value="v")   # job orchestration

## Delta Lake Operations

    from delta.tables import DeltaTable

    delta = DeltaTable.forPath(spark, "/path")

    # Upsert (MERGE)
    delta.alias("t").merge(
      source=df.alias("s"),
      condition="t.id = s.id"
    ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

    # Time travel
    spark.read.format("delta").option("versionAsOf", 3).load("/path")

    # Optimize & Vacuum (Databricks SQL commands)
    spark.sql("OPTIMIZE catalog.db.table ZORDER BY (id)")
    spark.sql("VACUUM catalog.db.table RETAIN 168 HOURS")

## Testing & Validation

    assert df.filter(col("id").isNull()).count() == 0
    assert set(df.columns) >= {"id","dt","amount"}
    df.limit(10).toPandas()  # small sample to inspect

## Debug & Explain

    df.explain(True)                 # or "cost", "extended", "formatted"
    spark.catalog.listTables("db")
    spark.catalog.listColumns("db", "table")

## Handy One-Liners

    # Schema DDL
    df.printSchema(); print(df._jdf.schema().treeString())  # or df.schema.simpleString()

    # Flatten nested struct fields (example)
    from pyspark.sql.functions import col
    flat = df.select(*[col(f"{c}.{a}").alias(f"{c}_{a}") 
                      for c,t in df.dtypes if t.startswith("struct") 
                      for a,_ in spark.createDataFrame([], df.schema[c].dataType).dtypes])

## Version Checks

    import pyspark, platform
    print(pyspark.__version__, spark.version, platform.python_version())

## Typical Project Layout

    project/
      ├─ notebooks/             # exploration
      ├─ jobs/                  # scheduled entry points
      ├─ src/                   # libs (transformations, utils)
      ├─ tests/                 # unit tests with pytest + chispa
      └─ conf/                  # job configs


















