In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [3]:
# spark is an existing SparkSession
df = spark.read.json("C:/spark/examples/src/main/resources/people.json")

In [4]:
# Displays the content of the DataFrame to stdout
df.show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [5]:
# spark, df are from the previous example
# Print the schema in a tree format
df.printSchema()
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [6]:
# Select only the "name" column
df.select("name").show()
# +-------+
# |   name|
# +-------+
# |Michael|
# |   Andy|
# | Justin|
# +-------+

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [7]:
# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()
# +-------+---------+
# |   name|(age + 1)|
# +-------+---------+
# |Michael|     null|
# |   Andy|       31|
# | Justin|       20|
# +-------+---------+

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+



In [8]:
# Select people older than 21
df.filter(df['age'] > 21).show()
# +---+----+
# |age|name|
# +---+----+
# | 30|Andy|
# +---+----+

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



In [9]:
# Count people by age
df.groupBy("age").count().show()
# +----+-----+
# | age|count|
# +----+-----+
# |  19|    1|
# |null|    1|
# |  30|    1|
# +----+-----+

+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+



In [10]:
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

In [11]:
sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [12]:
# Register the DataFrame as a global temporary view
df.createGlobalTempView("people")

In [13]:
# Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [14]:
# Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [15]:
from pyspark.sql import Row

In [16]:
sc = spark.sparkContext

In [17]:
# Load a text file and convert each line to a Row.
lines = sc.textFile("C:/spark/examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

In [18]:
type(lines)

pyspark.rdd.RDD

In [19]:
parts

PythonRDD[41] at RDD at PythonRDD.scala:49

In [20]:
type(parts)

pyspark.rdd.PipelinedRDD

In [21]:
people

PythonRDD[42] at RDD at PythonRDD.scala:49

In [22]:
type(people)

pyspark.rdd.PipelinedRDD

In [23]:
# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")

In [24]:
schemaPeople.show()

+---+-------+
|age|   name|
+---+-------+
| 29|Michael|
| 30|   Andy|
| 19| Justin|
+---+-------+



In [25]:
type(schemaPeople)

pyspark.sql.dataframe.DataFrame

In [26]:
# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

In [27]:
teenagers

DataFrame[name: string]

In [28]:
type(teenagers)

pyspark.sql.dataframe.DataFrame

In [29]:
# The results of SQL queries are Dataframe objects.
# rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
for name in teenNames:
    print(name)
# Name: Justin

Name: Justin


In [30]:
# Import data types
from pyspark.sql.types import *

In [31]:
sc = spark.sparkContext

In [32]:
# Load a text file and convert each line to a Row.
lines = sc.textFile("C:/spark/examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))

In [33]:
# The schema is encoded in a string.
schemaString = "name age"

In [34]:
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

In [35]:
# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)

In [36]:
# Creates a temporary view using the DataFrame
schemaPeople.createOrReplaceTempView("people")

In [37]:
# SQL can be run over DataFrames that have been registered as a table.
results = spark.sql("SELECT name FROM people")

In [38]:
results.show()
# +-------+
# |   name|
# +-------+
# |Michael|
# |   Andy|
# | Justin|
# +-------+

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [39]:
df = spark.read.load("C:/spark/examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

In [40]:
df

DataFrame[name: string, favorite_color: string, favorite_numbers: array<int>]

In [41]:
import numpy as np
import pandas as pd

In [42]:
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

In [43]:
# Generate a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(100, 3))

In [44]:
# Create a Spark DataFrame from a Pandas DataFrame using Arrow
df = spark.createDataFrame(pdf)

RuntimeError: module compiled against API version 0xb but this version of numpy is 0xa



In [45]:
# Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
result_pdf = df.select("*").toPandas()

RuntimeError: module 'pyarrow' has no attribute 'compat'
Note: toPandas attempted Arrow optimization because 'spark.sql.execution.arrow.enabled' is set to true. Please set it to false to disable this.