In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("demo").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/23 03:55:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
df = spark.createDataFrame(
    [
        ("sue", 32),
        ("li", 3),
        ("bob", 75),
        ("heo", 13),
    ],
    ["first_name", "age"],
)

In [5]:
df.show()

                                                                                

+----------+---+
|first_name|age|
+----------+---+
|       sue| 32|
|        li|  3|
|       bob| 75|
|       heo| 13|
+----------+---+



In [5]:
from pyspark.sql.functions import col, when

df1 = df.withColumn("life_stage",
                    when(col("age") < 13, "child")
                    .when(col("age").between(13, 19), "teenager")
                    .otherwise("adult"),
                   )

In [7]:
emp = [(1,"Smith",-1,"2018","10","M",3000), \
    (2,"Rose",1,"2010","20","M",4000), \
    (3,"Williams",1,"2010","10","M",1000), \
    (4,"Jones",2,"2005","10","F",2000), \
    (5,"Brown",2,"2010","40","",-1), \
      (6,"Brown",2,"2010","50","",-1) \
  ]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
       "emp_dept_id","gender","salary"]


In [8]:
df2 = spark.createDataFrame(emp,empColumns)

In [7]:
df1.where(col("life_stage").isin(["teenager","adult"])).show()

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       sue| 32|     adult|
|       bob| 75|     adult|
|       heo| 13|  teenager|
+----------+---+----------+



In [8]:
from pyspark.sql.functions import avg

df1.select(avg("age")).show()



+--------+
|avg(age)|
+--------+
|   30.75|
+--------+



                                                                                

In [9]:
df1.groupBy("life_stage").avg("age").show()



+----------+--------+
|life_stage|avg(age)|
+----------+--------+
|     adult|    53.5|
|     child|     3.0|
|  teenager|    13.0|
+----------+--------+



                                                                                

In [17]:
df4=df1.where((col("age") > 0) & (col("life_stage")=='child')).show()

                                                                                

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|        li|  3|     child|
+----------+---+----------+



In [None]:
spark.sql("select avg(age) from {x}",x=df1).show()

In [None]:
df1.write.saveAsTable("some_people")

In [None]:
spark.sql("select * from some_people").show()

In [None]:
spark.sql("INSERT INTO some_people VALUES ('frank', 4, 'child')")

In [None]:
spark.sql("select * from some_people").show()

In [None]:
spark.sql("select * from some_people where life_stage='teenager'").show()

In [None]:
df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers","host1:port1,host2:port2")
    .option("subscribe", subscribeTopic).load()
    
)

In [None]:
schema = StructType([
 StructField("student_name", StringType()),
 StructField("graduation_year", StringType()),
 StructField("major", StringType()),
])

def with_normalized_names(df, schema):
    parsed_df = (
        df.withColumn("json_data", from_json(col("value").cast("string"), schema))
        .withColumn("student_name", col("json_data.student_name"))
        .withColumn("graduation_year", col("json_data.graduation_year"))
        .withColumn("major", col("json_data.major"))
        .drop(col("json_data"))
        .drop(col("value"))
    )
    split_col = split(parsed_df["student_name"], "XX")
    return (
        parsed_df.withColumn("first_name", split_col.getItem(0))
        .withColumn("last_name", split_col.getItem(1))
        .drop("student_name")
    )


In [None]:
./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999

In [3]:


# New Dataframe queries

In [4]:
train_df = spark.read.load("../Datasets/train.csv", format="csv", inferSchema="True", header="True")

                                                                                

In [5]:
df2 = train_df

In [6]:
df2.show()

24/01/23 04:00:04 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+---+----------+--------+-----------+-------+------+-----+--------+-----------+---------+---------+---------+------------+----------+----------+--------+----------+-----------+-----------+---------+------------+---------+--------+-----------+-----------+----------+----------+---------+---------+----------+--------+--------+------------+------------+----------+------------+----------+---------+-----------+-------+---------+----------+----------+--------+--------+------------+---------+------------+------------+--------+--------+------------+------------+-----------+------------+----------+----------+-----------+----------+-----------+------------+----------+----------+----------+----------+----------+----------+-----------+-------------+---------+-----------+--------+------+-----+-----------+-------+------+------+--------+-------------+---------+
| Id|MSSubClass|MSZoning|LotFrontage|LotArea|Street|Alley|LotShape|LandContour|Utilities|LotConfig|LandSlope|Neighborhood|Condition1|Condition

In [7]:
df2.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- MSSubClass: integer (nullable = true)
 |-- MSZoning: string (nullable = true)
 |-- LotFrontage: string (nullable = true)
 |-- LotArea: integer (nullable = true)
 |-- Street: string (nullable = true)
 |-- Alley: string (nullable = true)
 |-- LotShape: string (nullable = true)
 |-- LandContour: string (nullable = true)
 |-- Utilities: string (nullable = true)
 |-- LotConfig: string (nullable = true)
 |-- LandSlope: string (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- Condition1: string (nullable = true)
 |-- Condition2: string (nullable = true)
 |-- BldgType: string (nullable = true)
 |-- HouseStyle: string (nullable = true)
 |-- OverallQual: integer (nullable = true)
 |-- OverallCond: integer (nullable = true)
 |-- YearBuilt: integer (nullable = true)
 |-- YearRemodAdd: integer (nullable = true)
 |-- RoofStyle: string (nullable = true)
 |-- RoofMatl: string (nullable = true)
 |-- Exterior1st: string (nullable = true)
 |--

In [16]:
df2.select(df2["LotFrontAge"]).distinct().filter("LotFrontAge like '%a'").count()

0

In [19]:
df2.select(df2["LotFrontAge"]).distinct().take(30)

[Row(LotFrontAge='51'),
 Row(LotFrontAge='124'),
 Row(LotFrontAge='54'),
 Row(LotFrontAge='101'),
 Row(LotFrontAge='138'),
 Row(LotFrontAge='69'),
 Row(LotFrontAge='112'),
 Row(LotFrontAge='42'),
 Row(LotFrontAge='73'),
 Row(LotFrontAge='87'),
 Row(LotFrontAge='64'),
 Row(LotFrontAge='30'),
 Row(LotFrontAge='34'),
 Row(LotFrontAge='59'),
 Row(LotFrontAge='160'),
 Row(LotFrontAge='85'),
 Row(LotFrontAge='52'),
 Row(LotFrontAge='35'),
 Row(LotFrontAge='NA'),
 Row(LotFrontAge='71'),
 Row(LotFrontAge='98'),
 Row(LotFrontAge='47'),
 Row(LotFrontAge='99'),
 Row(LotFrontAge='110'),
 Row(LotFrontAge='107'),
 Row(LotFrontAge='96'),
 Row(LotFrontAge='43'),
 Row(LotFrontAge='100'),
 Row(LotFrontAge='70'),
 Row(LotFrontAge='174')]

In [20]:
df2.select(df2["LotFrontAge"]).distinct().where("LotFrontAge like '%'").count()

111

In [26]:
df2.filter("YearBuilt > 1995").count()

467