In [21]:
from pyspark.sql import SparkSession
from pyspark.sql import Column
from pyspark.sql.functions import *
from datetime import datetime, date

# Start

In [23]:
spark = SparkSession.builder.appName("HelloPySpark").getOrCreate()

21/10/12 21:07:27 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [24]:
print("spark.version ==", spark.version)

spark.version == 3.1.2


# DataFrame

In [25]:
df = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')

In [29]:
df.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



In [30]:
df.printSchema()

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



In [32]:
df.dtypes

[('a', 'bigint'),
 ('b', 'double'),
 ('c', 'string'),
 ('d', 'date'),
 ('e', 'timestamp')]

## Selecting and Accessing

In [39]:
df3 = df.withColumn('upper_c', upper(df.c))

In [40]:
df3.show()

+---+---+-------+----------+-------------------+-------+
|  a|  b|      c|         d|                  e|upper_c|
+---+---+-------+----------+-------------------+-------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|
+---+---+-------+----------+-------------------+-------+



In [33]:
df.withColumn('upper_c', upper(df.c)).show()

+---+---+-------+----------+-------------------+-------+
|  a|  b|      c|         d|                  e|upper_c|
+---+---+-------+----------+-------------------+-------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|
+---+---+-------+----------+-------------------+-------+



In [36]:
df.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



In [37]:
df.select(col("c")).show()

+-------+
|      c|
+-------+
|string1|
|string2|
|string3|
+-------+



In [38]:
df.filter(df.a == 1).show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+



## Grouping Data

In [41]:
df2 = spark.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])

In [43]:
df2.printSchema()

root
 |-- color: string (nullable = true)
 |-- fruit: string (nullable = true)
 |-- v1: long (nullable = true)
 |-- v2: long (nullable = true)



In [44]:
df2.show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



In [46]:
df2.groupBy('color').avg().show()

+-----+-------+-------+
|color|avg(v1)|avg(v2)|
+-----+-------+-------+
|  red|    4.8|   48.0|
|black|    6.0|   60.0|
| blue|    3.0|   30.0|
+-----+-------+-------+



## Word Count

In [47]:
lines = spark.read.text("/opt/spark/README.md")

In [48]:
lines.show(10)

+--------------------+
|               value|
+--------------------+
|      # Apache Spark|
|                    |
|Spark is a unifie...|
|high-level APIs i...|
|supports general ...|
|rich set of highe...|
|MLlib for machine...|
|and Structured St...|
|                    |
|<https://spark.ap...|
+--------------------+
only showing top 10 rows



In [50]:
df4 = lines.withColumn("words", split(col("value"), " "))

In [52]:
df4.show(5)

+--------------------+--------------------+
|               value|               words|
+--------------------+--------------------+
|      # Apache Spark|  [#, Apache, Spark]|
|                    |                  []|
|Spark is a unifie...|[Spark, is, a, un...|
|high-level APIs i...|[high-level, APIs...|
|supports general ...|[supports, genera...|
+--------------------+--------------------+
only showing top 5 rows



In [53]:
df4.printSchema()

root
 |-- value: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [54]:
words = lines.withColumn("wordsArray", split(col("value"), " ")).withColumn("words", explode("wordsArray"))
words.show()

+--------------------+--------------------+-----------+
|               value|          wordsArray|      words|
+--------------------+--------------------+-----------+
|      # Apache Spark|  [#, Apache, Spark]|          #|
|      # Apache Spark|  [#, Apache, Spark]|     Apache|
|      # Apache Spark|  [#, Apache, Spark]|      Spark|
|                    |                  []|           |
|Spark is a unifie...|[Spark, is, a, un...|      Spark|
|Spark is a unifie...|[Spark, is, a, un...|         is|
|Spark is a unifie...|[Spark, is, a, un...|          a|
|Spark is a unifie...|[Spark, is, a, un...|    unified|
|Spark is a unifie...|[Spark, is, a, un...|  analytics|
|Spark is a unifie...|[Spark, is, a, un...|     engine|
|Spark is a unifie...|[Spark, is, a, un...|        for|
|Spark is a unifie...|[Spark, is, a, un...|large-scale|
|Spark is a unifie...|[Spark, is, a, un...|       data|
|Spark is a unifie...|[Spark, is, a, un...|processing.|
|Spark is a unifie...|[Spark, is, a, un...|     

In [55]:
counts = words.groupBy(col("words")).count()
counts.orderBy(desc("count")).show()

+---------+-----+
|    words|count|
+---------+-----+
|         |   73|
|      the|   23|
|       to|   16|
|    Spark|   14|
|      for|   12|
|        a|    9|
|      and|    9|
|       ##|    9|
|      run|    7|
|       on|    7|
|       is|    7|
|      can|    6|
|     also|    5|
|       of|    5|
|       in|    5|
|      you|    4|
|       an|    4|
|   Please|    4|
|        *|    4|
|including|    4|
+---------+-----+
only showing top 20 rows



In [56]:
output = counts.collect()
output

[Row(words='[![PySpark', count=1),
 Row(words='online', count=1),
 Row(words='graphs', count=1),
 Row(words='["Building', count=1),
 Row(words='documentation', count=3),
 Row(words='command,', count=2),
 Row(words='abbreviated', count=1),
 Row(words='overview', count=1),
 Row(words='rich', count=1),
 Row(words='set', count=2),
 Row(words='-DskipTests', count=1),
 Row(words='1,000,000,000:', count=2),
 Row(words='name', count=1),
 Row(words='["Specifying', count=1),
 Row(words='stream', count=1),
 Row(words='run:', count=1),
 Row(words='not', count=1),
 Row(words='programs', count=2),
 Row(words='tests', count=2),
 Row(words='./dev/run-tests', count=1),
 Row(words='will', count=1),
 Row(words='[run', count=1),
 Row(words='particular', count=2),
 Row(words='Alternatively,', count=1),
 Row(words='must', count=1),
 Row(words='using', count=3),
 Row(words='./build/mvn', count=1),
 Row(words='you', count=4),
 Row(words='MLlib', count=1),
 Row(words='DataFrames,', count=1),
 Row(words='variab

## SQL

In [57]:
df2.createOrReplaceTempView("tableA")
spark.sql("SELECT count(*) as count from tableA").show()

+-----+
|count|
+-----+
|    8|
+-----+



In [58]:
spark.stop()