In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, avg

Tutorial by https://spark.apache.org/examples.html

# Spark DataFrame example

In [13]:
spark = SparkSession.builder.appName("demo").getOrCreate()
df = spark.createDataFrame(
    [
        ("sue", 32),
        ("li", 3),
        ("bob", 75),
        ("heo", 13),
    ],
    ["first_name", "age"],
)
df.show()

+----------+---+
|first_name|age|
+----------+---+
|       sue| 32|
|        li|  3|
|       bob| 75|
|       heo| 13|
+----------+---+



In [16]:
# Create life_stage column
df1 = df.withColumn(
    "life_stage",
    when(col("age") < 13, "child")
    .when(col("age").between(13, 19), "teenager")
    .otherwise("adult"),
)
df1.show()

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       sue| 32|     adult|
|        li|  3|     child|
|       bob| 75|     adult|
|       heo| 13|  teenager|
+----------+---+----------+



In [17]:
# Filter by life_stage column
df1.where(col("life_stage").isin(["teenager", "adult"])).show()


+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       sue| 32|     adult|
|       bob| 75|     adult|
|       heo| 13|  teenager|
+----------+---+----------+



In [19]:
# Show average age
df1.select(avg("age")).show()
# spark.sql("select avg(age) from {df1}", df1=df1).show()

+--------+
|avg(age)|
+--------+
|   30.75|
+--------+



In [20]:
# Show average age grouped by life_stage
df1.groupBy("life_stage").avg().show()
# spark.sql("select life_stage, avg(age) from {df1} group by life_stage", df1=df1).show()

+----------+--------+
|life_stage|avg(age)|
+----------+--------+
|     adult|    53.5|
|     child|     3.0|
|  teenager|    13.0|
+----------+--------+



In [None]:
# Save df1 as some_people table
# Mode types:
    # append: Append contents of this DataFrame to existing data.
    # overwrite: Overwrite existing data.
    # error or errorifexists: Throw an exception if data already exists.
    # ignore: Silently ignore this operation if data already exists.
df1.write.mode("overwrite").saveAsTable("some_people")
# spark.sql("select * from some_people").show()

spark.sql("select * from some_people").show()

spark.sql("INSERT INTO some_people VALUES ('frank', 4, 'child')")
spark.sql("select * from some_people where life_stage='teenager'").show()

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       heo| 13|  teenager|
|       bob| 75|     adult|
|       sue| 32|     adult|
|        li|  3|     child|
+----------+---+----------+

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       heo| 13|  teenager|
+----------+---+----------+

