# Spark SQL notebook

**can work without Spark cluster for some cases**

**!!!do not use space in SparkSession/SparkContext's parameters such as app name**

## rdd/json/txt/parquet => DataFrame

## DataFrame => Table => SQL


https://spark.apache.org/docs/latest/sql-programming-guide.html

examples\src\main\python\sql\basic.py


In [40]:
# notebook config
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = 'all'

# spark bootstrap
import os
import sys

####################################################
##! Windows env
## set working dir
os.chdir('C:\\nili\\my-git-projects\\learn_python')
# spark_path = 'C:/dev/spark-2.2.1-bin-hadoop2.7'
# hadoop_path = 'C:/dev/winutils/hadoop-2.7.1'
spark_path = "C:\\dev\\spark-2.2.1-bin-hadoop2.7"
hadoop_path='C:\\dev\\winutils\\hadoop-2.7.1'
####################################################

os.environ['SPARK_HOME'] = spark_path
os.environ['HADOOP_HOME'] = hadoop_path
sys.path.append(spark_path + "/bin")
sys.path.append(spark_path + "/python")
sys.path.append(spark_path + "/python/pyspark/")
sys.path.append(spark_path + "/python/lib")
sys.path.append(spark_path + "/python/lib/pyspark.zip")
sys.path.append(spark_path + "/python/lib/py4j-0.10.4-src.zip")

# spark import
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *

In [3]:
# With a SparkSession, applications can create DataFrames from an existing RDD, from a Hive table, or from Spark data sources.
# SparkSession vs. SparkContext: sc = spark.sparkContext
spark = SparkSession.builder \
    .master('local[2]') \
    .appName("test2") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [4]:
# create dataframe
# spark is an existing SparkSession
df = spark.read.json("data/people.json")
# Displays the content of the DataFrame to stdout
#! must use show(), notebook cannot print the variable
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [5]:
# Tree format schema
df.printSchema()

# Select only the "name" column
df.select("name").show()
#xxx df['name'].show()

# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()

# Select people older than 21
df.filter(df['age'] > 21).show()
#! same as above, like pandas' dataframe
df[df['age'] > 21].show()

# Count people by age
df.groupBy("age").count().show()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+



# DataFrame=>Table=>SQL

SQL can be run over DataFrames that have been registered as a table.

In [46]:
#!! Register the DataFrame as a SQL temporary view
# (treat dataframe as SQL)
df.createOrReplaceTempView("people")

sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

#! convert to pandas 
pandasDF=sqlDF.toPandas()
pandasDF


+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+



Unnamed: 0,name,favorite_color,favorite_numbers
0,Alyssa,,"[3, 9, 15, 20]"
1,Ben,red,[]


In [9]:
# Register the DataFrame as a global 
df.createGlobalTempView("people")
# Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [10]:
spark.sql("""
SELECT * FROM global_temp.people
order by age
""").show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  19| Justin|
|  30|   Andy|
+----+-------+



In [11]:
# Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [20]:
# $example on:schema_inferring$

sc = spark.sparkContext

# Load a text file and convert each line to a Row.
# return an RDD
lines = sc.textFile("data/people.txt")
# iterate each line and split it by ','
#! map is only for RDD, DataFrame has to be converted to RDD first
parts = lines.map(lambda l: l.split(","))
#! define 'name,age' as schema
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))


# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")
schemaPeople.show()

# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
teenagers.show()

# The results of SQL queries are Dataframe objects.
# rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
for name in teenNames:
    print(name)

+---+-------+
|age|   name|
+---+-------+
| 29|Michael|
| 30|   Andy|
| 19| Justin|
+---+-------+

+------+
|  name|
+------+
|Justin|
+------+

Name: Justin


In [22]:
# $example on:programmatic_schema$
sc = spark.sparkContext

# Load a text file and convert each line to a Row.
lines = sc.textFile("data/people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))

#! The schema is encoded in a string.
schemaString = "name age"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
fields
schema = StructType(fields)
schema

# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)
# Creates a temporary view using the DataFrame
schemaPeople.createOrReplaceTempView("people")
# SQL can be run over DataFrames that have been registered as a table.
results = spark.sql("SELECT name FROM people")
results.show()


[StructField(name,StringType,true), StructField(age,StringType,true)]

StructType(List(StructField(name,StringType,true),StructField(age,StringType,true)))

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



# Data source
A simple example demonstrating Spark SQL data sources (json, parquet).

Run with:

  ./bin/spark-submit examples/src/main/python/sql/datasource.py

## parquet

In [24]:
# $example on:generic_load_save_functions$
file="data/users.parquet"
df = spark.read.load(file)
df.select("name", "favorite_color").write.save("data/namesAndFavColors.parquet",mode='overwrite')
# $example off:generic_load_save_functions$

# $example on:write_partitioning$
#! stored in spark-warehouse by default
# spark.sql.warehouse.dir (default: ${system:user.dir}/spark-warehouse) i
df.write.partitionBy("favorite_color").format("parquet").save("data/namesPartByColor.parquet",mode='overwrite')
# $example off:write_partitioning$

In [27]:
df = spark.read.load(file)
df.show()

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+



In [44]:
# $example on:write_partition_and_bucket$
file = "data/users.parquet"
df = spark.read.parquet(file)
#.bucketBy(42, "name") #??? no bucketBy in 2.2.1
df.write.partitionBy("favorite_color").saveAsTable(
    "people_partitioned_bucketed", mode='overwrite')
# $example off:write_partition_and_bucket$

In [32]:
# $example on:manual_load_options$
df = spark.read.load("data/people.json", format="json")
df.select("name", "age").write.save("data/namesAndAges.parquet", format="parquet",mode='overwrite')
# $example off:manual_load_options$

# $example on:write_sorting_and_bucketing$
df.write.saveAsTable("people_bucketed")
# df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
# $example off:write_sorting_and_bucketing$

spark.sql("DROP TABLE IF EXISTS people_bucketed")
spark.sql("DROP TABLE IF EXISTS people_partitioned_bucketed")

DataFrame[]

DataFrame[]

### Run SQL on files directly

In [45]:
# $example on:direct_sql$
df = spark.sql("SELECT * FROM parquet.`data/users.parquet`")
df.show()
# $example off:direct_sql$

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+



## avro with databricks spark-avro

In [None]:
# Creates a DataFrame from a specified directory
df = spark.read.format("com.databricks.spark.avro").load("src/test/resources/episodes.avro")

#  Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("com.databricks.spark.avro").save("/tmp/output")

## json

In [34]:
# $example on:basic_parquet_example$
peopleDF = spark.read.json("data/people.json")

# DataFrames can be saved as Parquet files, maintaining the schema information.
peopleDF.write.parquet("data/people.parquet",mode="overwrite")

# Read in the Parquet file created above.
# Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile = spark.read.parquet("data/people.parquet")

# Parquet files can also be used to create a temporary view and then used in SQL statements.
parquetFile.createOrReplaceTempView("parquetFile")
teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.show()


+------+
|  name|
+------+
|Justin|
+------+



In [35]:
# $example on:schema_merging$
# spark is from the previous example.
# Create a simple DataFrame, stored into a partition directory
sc = spark.sparkContext

squaresDF = spark.createDataFrame(
    sc.parallelize(range(1, 6)).map(lambda i: Row(single=i, double=i**2)))
squaresDF.write.parquet("data/test_table/key=1", mode='overwrite')

# Create another DataFrame in a new partition directory,
# adding a new column and dropping an existing column
cubesDF = spark.createDataFrame(
    sc.parallelize(range(6, 11)).map(lambda i: Row(single=i, triple=i**3)))
cubesDF.write.parquet("data/test_table/key=2", mode='overwrite')

# Read the partitioned table
mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()

# The final schema consists of all 3 columns in the Parquet files together
# with the partitioning column appeared in the partition directory paths.

root
 |-- double: long (nullable = true)
 |-- single: long (nullable = true)
 |-- triple: long (nullable = true)
 |-- key: integer (nullable = true)



In [36]:
# $example on:json_dataset$
# spark is from the previous example.
sc = spark.sparkContext

# A JSON dataset is pointed to by path.
# The path can be either a single text file or a directory storing text files
path = "data/people.json"
peopleDF = spark.read.json(path)

# The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()
# root
#  |-- age: long (nullable = true)
#  |-- name: string (nullable = true)

# Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

# SQL statements can be run by using the sql methods provided by spark
teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
# +------+
# |  name|
# +------+
# |Justin|
# +------+

# Alternatively, a DataFrame can be created for a JSON dataset represented by
# an RDD[String] storing one JSON object per string
jsonStrings = ['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']
otherPeopleRDD = sc.parallelize(jsonStrings)
otherPeople = spark.read.json(otherPeopleRDD)
otherPeople.show()


root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

+------+
|  name|
+------+
|Justin|
+------+

+---------------+----+
|        address|name|
+---------------+----+
|[Columbus,Ohio]| Yin|
+---------------+----+



## jdbc

In [39]:
# $example on:jdbc_dataset$
# Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
# Loading data from a JDBC source
jdbcDF = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .load()

jdbcDF2 = spark.read \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})

# Saving data to a JDBC source
jdbcDF.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .save()

jdbcDF2.write \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})

# Specifying create table column data types on write
jdbcDF.write \
    .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})
# $example off:jdbc_dataset$



DataFrame[address: struct<city:string,state:string>, name: string]

# Hive
A simple example demonstrating Spark SQL Hive integration.

Run with:

  ./bin/spark-submit examples/src/main/python/sql/hive.py

In [None]:
from os.path import expanduser, join, abspath
# $example on:spark_hive$
# warehouse_location points to the default location for managed databases and tables
warehouse_location = abspath('spark-warehouse')

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration example") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .enableHiveSupport() \
    .getOrCreate()

# spark is an existing SparkSession
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show()
# +---+-------+
# |key|  value|
# +---+-------+
# |238|val_238|
# | 86| val_86|
# |311|val_311|
# ...

# Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM src").show()
# +--------+
# |count(1)|
# +--------+
# |    500 |
# +--------+

# The results of SQL queries are themselves DataFrames and support all normal functions.
sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

# The items in DataFrames are of type Row, which allows you to access each column by ordinal.
stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value))
for record in stringsDS.collect():
    print(record)
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# ...

# You can also use DataFrames to create temporary views within a SparkSession.
Record = Row("key", "value")
recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)])
recordsDF.createOrReplaceTempView("records")

# Queries can then join DataFrame data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
# +---+------+---+------+
# |key| value|key| value|
# +---+------+---+------+
# |  2| val_2|  2| val_2|
# |  4| val_4|  4| val_4|
# |  5| val_5|  5| val_5|
# ...
# $example off:spark_hive$

spark.stop()
