# Spark DataFrame example

In [1]:
# Welcome to your new notebook
# Type here in the cell editor to add code!
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("demo").getOrCreate()

StatementMeta(, f384f3db-9432-44c0-8227-da00452a68f1, 3, Finished, Available, Finished)

### Create a Spark DataFrame

In [2]:
df = spark.createDataFrame(
    [
        ("sue", 32),
        ("li", 3),
        ("bob", 75),
        ("heo", 13),
    ],
    ["first_name", "age"],
)

StatementMeta(, f384f3db-9432-44c0-8227-da00452a68f1, 4, Finished, Available, Finished)

In [3]:
df.show()

StatementMeta(, f384f3db-9432-44c0-8227-da00452a68f1, 5, Finished, Available, Finished)

+----------+---+
|first_name|age|
+----------+---+
|       sue| 32|
|        li|  3|
|       bob| 75|
|       heo| 13|
+----------+---+



### Add a column to a Spark DataFrame

In [4]:
from pyspark.sql.functions import col, when

df1 = df.withColumn(
    "life_stage",
    when(col("age") < 13, "child")
    .when(col("age").between(13, 19), "teenager")
    .otherwise("adult"),
)

StatementMeta(, f384f3db-9432-44c0-8227-da00452a68f1, 6, Finished, Available, Finished)

In [5]:
df1.show()

StatementMeta(, f384f3db-9432-44c0-8227-da00452a68f1, 7, Finished, Available, Finished)

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       sue| 32|     adult|
|        li|  3|     child|
|       bob| 75|     adult|
|       heo| 13|  teenager|
+----------+---+----------+



In [6]:
df.show()

StatementMeta(, f384f3db-9432-44c0-8227-da00452a68f1, 8, Finished, Available, Finished)

+----------+---+
|first_name|age|
+----------+---+
|       sue| 32|
|        li|  3|
|       bob| 75|
|       heo| 13|
+----------+---+



### Filter a Spark DataFrame

In [7]:
df1.where(col("life_stage").isin(["teenager", "adult"])).show()

StatementMeta(, f384f3db-9432-44c0-8227-da00452a68f1, 9, Finished, Available, Finished)

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       sue| 32|     adult|
|       bob| 75|     adult|
|       heo| 13|  teenager|
+----------+---+----------+



### Group by aggregation on Spark DataFrame

In [8]:
from pyspark.sql.functions import avg

df1.select(avg("age")).show()

StatementMeta(, f384f3db-9432-44c0-8227-da00452a68f1, 10, Finished, Available, Finished)

+--------+
|avg(age)|
+--------+
|   30.75|
+--------+



In [9]:
df1.groupBy("life_stage").avg().show()


StatementMeta(, f384f3db-9432-44c0-8227-da00452a68f1, 11, Finished, Available, Finished)

+----------+--------+
|life_stage|avg(age)|
+----------+--------+
|     adult|    53.5|
|     child|     3.0|
|  teenager|    13.0|
+----------+--------+



### Query the DataFrame with SQL

In [10]:
spark.sql("select avg(age) from {df1}", df1=df1).show()

StatementMeta(, f384f3db-9432-44c0-8227-da00452a68f1, 12, Finished, Available, Finished)

+--------+
|avg(age)|
+--------+
|   30.75|
+--------+



In [11]:
spark.sql("select life_stage, avg(age) from {df1} group by life_stage", df1=df1).show()

StatementMeta(, f384f3db-9432-44c0-8227-da00452a68f1, 13, Finished, Available, Finished)

+----------+--------+
|life_stage|avg(age)|
+----------+--------+
|     adult|    53.5|
|     child|     3.0|
|  teenager|    13.0|
+----------+--------+



# Spark SQL Example

In [12]:
df1.write.saveAsTable("some_people")

StatementMeta(, f384f3db-9432-44c0-8227-da00452a68f1, 14, Finished, Available, Finished)

In [13]:
spark.sql("select * from some_people").show()

StatementMeta(, f384f3db-9432-44c0-8227-da00452a68f1, 15, Finished, Available, Finished)

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       sue| 32|     adult|
|        li|  3|     child|
|       bob| 75|     adult|
|       heo| 13|  teenager|
+----------+---+----------+



In [14]:
spark.sql("INSERT INTO some_people VALUES ('frank', 4, 'child')")

StatementMeta(, f384f3db-9432-44c0-8227-da00452a68f1, 16, Finished, Available, Finished)

DataFrame[]

In [15]:
spark.sql("select * from some_people").show()

StatementMeta(, f384f3db-9432-44c0-8227-da00452a68f1, 17, Finished, Available, Finished)

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       sue| 32|     adult|
|        li|  3|     child|
|       bob| 75|     adult|
|       heo| 13|  teenager|
|     frank|  4|     child|
+----------+---+----------+



In [16]:
spark.sql("select * from some_people where life_stage='teenager'").show()

StatementMeta(, f384f3db-9432-44c0-8227-da00452a68f1, 18, Finished, Available, Finished)

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       heo| 13|  teenager|
+----------+---+----------+



### Spark Structured Streaming Example

In [17]:
{"student_name":"someXXperson", "graduation_year":"2023", "major":"math"}
{"student_name":"liXXyao", "graduation_year":"2025", "major":"physics"}

StatementMeta(, f384f3db-9432-44c0-8227-da00452a68f1, 19, Finished, Available, Finished)

{'student_name': 'liXXyao', 'graduation_year': '2025', 'major': 'physics'}

In [18]:
df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("subscribe", subscribeTopic)
    .load()
)

StatementMeta(, f384f3db-9432-44c0-8227-da00452a68f1, 20, Finished, Available, Finished)

NameError: name 'subscribeTopic' is not defined

In [None]:
schema = StructType([
 StructField("student_name", StringType()),
 StructField("graduation_year", StringType()),
 StructField("major", StringType()),
])

def with_normalized_names(df, schema):
    parsed_df = (
        df.withColumn("json_data", from_json(col("value").cast("string"), schema))
        .withColumn("student_name", col("json_data.student_name"))
        .withColumn("graduation_year", col("json_data.graduation_year"))
        .withColumn("major", col("json_data.major"))
        .drop(col("json_data"))
        .drop(col("value"))
    )
    split_col = split(parsed_df["student_name"], "XX")
    return (
        parsed_df.withColumn("first_name", split_col.getItem(0))
        .withColumn("last_name", split_col.getItem(1))
        .drop("student_name")
    )

StatementMeta(, f384f3db-9432-44c0-8227-da00452a68f1, -1, Cancelled, , Cancelled)

In [None]:
def perform_available_now_update():
    checkpointPath = "data/tmp_students_checkpoint/"
    path = "data/tmp_students"
    return df.transform(lambda df: with_normalized_names(df)).writeStream.trigger(
        availableNow=True
    ).format("parquet").option("checkpointLocation", checkpointPath).start(path)

StatementMeta(, f384f3db-9432-44c0-8227-da00452a68f1, -1, Cancelled, , Cancelled)

# Spark RDD Example

In [21]:
text_file = spark.sparkContext.textFile("Files/some_words.txt")

counts = (
    text_file.flatMap(lambda line: line.split(" "))
    .map(lambda word: (word, 1))
    .reduceByKey(lambda a, b: a + b)
)

StatementMeta(, f384f3db-9432-44c0-8227-da00452a68f1, 23, Finished, Available, Finished)

In [22]:
counts.collect()

StatementMeta(, f384f3db-9432-44c0-8227-da00452a68f1, 24, Finished, Available, Finished)

[('these', 2),
 ('are', 2),
 ('more', 1),
 ('in', 1),
 ('words', 3),
 ('english', 1)]