In [24]:
# Create a basic SparkSession

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Python Spark SQL basic example")\
    .config("spark.some.config.option", "some-value")\
    .getOrCreate()

In [4]:
#Creating DataFrames
#spark is an existing SparkSession
df = spark.read.json("people 2.json")
#Displays the content of the DataFrame to stdout
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [5]:
#Untyped Dataset Operations (aka DataFrame Operations)


**In Python it's possible to access a DataFrame's columns either by attribute (df.age) or by indexing(df['age']). While the former is convenient for interactive data exploration, it is highly recommended the letter form to be used**

In [6]:
# spark, df are from the previous example
# Print the schema in a tree format
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [7]:
# Select only the "name" column
df.select("name").show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [8]:
#Select everybody, but increment the age by 1
df.select(df['name'], df['age'] +1).show()


+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+



In [9]:
# Select people older than 21
df.filter(df['age'] > 21).show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



In [10]:
#Count people by age
df.groupBy("age").count().show()

+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+



In [11]:
#Running SQL Queries Programmatically


**The sql function on a SparkSession enables applications to run SQL queries programmatically and returns the result as a DataFrame.**

In [12]:
#Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

In [13]:
sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [14]:
#Global Temporary View

**Temporary views in Spark SQL are session-scoped and will disappear if the session that creates it terminates. If you want to have a temporary view that is shared among all sessions and keep alive until the Spark application terminates, you can create a global temporary view. Global temporary view is tied to a system preserved database "global_temp" and we must use the qualified name to refer it, e.g. SELeCT * FROM "global_temp.view1"**

In [15]:
#Register the DataFrame as global temporary view
df.createGlobalTempView("people")

In [17]:
#Global temporary view is tied to a system preserved database 'global_temp'
spark.sql("SELECT * FROM global_temp.people").show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [18]:
#Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [19]:
# Creating Datasets
#WARNING ITS SCALA AND JAVA ONLY

*Datasets are similar to RDDs, however, instead of using Java serialization or Kryo
they use a specialized Encoder to serialize the objects for processing or
transmitting over the network. While both encoders and standard serialization 
are responsible for turning an object into bytes, encoders are code generated dynamically and
use a format that allows Spark to perform many operations like filtering
sorting and hashing without deserializing the bytes back into an object.*

In [None]:
#case class Person(name: String, age: Long)

#Encoders are created for case classes
#val caseClassDS = Seq(Person("Andy", 32)).toDS()
#caseClassDS.show()

In [1]:
#Encoders for most common types are automatically provided by importing spark.implicits._
#val primitiveDS = Seq(1, 2, 3).toDS()
#primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

#DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
#val path = "examples/src/main/resources/people.json"
#val peopleDS = spark.read.json(path).as[Person]
#peopleDS.show()

In [2]:
#Iteroperating with RDDs

*
-Spark SQL supports two different methods for converting existing RDDs int
Datasets. The first method uses reflection to infer the schema of an RDD that
contains specific types of objects. This reflection-based approach leads
to more concise code and works well when you already know the schema while
writing your Spark application

-The second method for creating Datasets is through a programatic interface that
allows you to construct a schema and then apply it to an existing RDD. While this
method is more verbose, it allows you to construct Datasets when the columns
and their types not known until runtime.

*

In [3]:
#Inferring the Schema Using Reflection

*Spark SQL can convert an RDD of ROw objects to a DataFrame, 
inferring the datatypes. Rows are constructed by passing a list of 
key/value pairs as kwargs to the Row class. The keys of this list
define the column names of the table, and the types are inferred by 
sampling the whole dataset, similar inference that is performed on JSON
files.*

In [8]:
from pyspark.sql import Row

sc = spark.sparkContext

#Load a text file and convert each line to a Row.
lines = sc.textFile("people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <=19")

#The results of SQL queries are DataFrame objects.
# rdd returns the content as an : class:`pyspark.RDD` of :class:`Row`.
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
for name in teenNames:
    print(name)
    

Name: Justin


In [9]:
# Programmatically Specifying the Schema

When a dictionary of kwargs cannot be defined ahead of time (for example,
the structure of records is encoded in a string, or a text dataset will be
parsed and fields will be projected differently for different users),
a DataFrame can be created programmatically with three steps.
1. Create a RDD of tuples or lists from the original RDD;
2. Create the schema represented by a StructType matching the structure
of tuples or lists in the RDD created in the step 1.
3. Apply the schema to the RDD via createDataFrame method provided by 
SparkSession

In [11]:
# Import data types
from pyspark.sql.types import *

sc = spark.sparkContext

# Load a text file and convert each line to a Row.
lines = sc.textFile("people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))

# The schema is encoded in a string.
schemaString = "name age"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)

# Creates a temporary view using the DataFrame
schemaPeople.createOrReplaceTempView("people")

# SQL can run over DataFrames that have been registered as a table.
results = spark.sql("SELECT name FROM people")

results.show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



**Aggregations**

The built-in DataFrames functions provide common aggregations such as count(),
countDistinct(),avg(), max(), min(), etc. While those functions are designed for 
DataFrames, Spark SQL also has type-safe versions for some of them in Scala and
Java to work with strongly typed Datasets. Moreover, users are limited to the predefined
aggregate functions and can create their own.

***Generic Load/Save Functions***

In [12]:
df = spark.read.load("users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

In the simplest form, the default data source (parquet unless otherwise configured by
spark.sql.sources.default) will be used for all operations.

In [13]:
#Manually Specifying Operations

You can also manually specify the data source that will be used 
along with any extra options that you would like to pass to data source.
Data sources are specified by their fully qualified name
(i.e. org.apache.sql.parquet), but for built-in sources you can
also use their short names(json, parquet, jdbc, orc, libsvm, csv, text).
DataFrames loaded from any data source type can be converted into 
other types using this syntax.

In [14]:
df = spark.read.load("people 2.json", format="json")
df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")

In [15]:
#To load a CSV file you can use

In [16]:
df = spark.read.load("people.csv", format="csv", sep=":", inferSchema="true", header="true")

You can control bloom filters and dictionary encodings for ORC data sources. 
The following ORC example will create bloom on favorite_color and
use dictionary encoding for name and favourite_color. For Parquet,
there exists parquet.enable.dictionary, too.
To find more detailed information about the extra ORC/Parquet, visit
the official Apache ORC/Parquet websites.

In [18]:
df = spark.read.orc("users.orc")
(df.write.format("orc")
    .option("orc.bloom.filter.columns", "favorite_color")
    .option("orc.dictionary.key.threshold", "1.0")
    .save("users_with_options.orc"))

In [19]:
#Run SQL on files directory

Instead of using read API to load a file into DataFrame and query it,
you can query that file directly with SQL

In [20]:
df = spark.sql("SELECT * FROM parquet.`users.parquet`")

In [21]:
#Bucketing, Sorting and Partitioning

For file-based data source, it is also possible to bucket
and sort or partition the output. Bucketing and sorting are applicable
only to persistent tables:

In [28]:
#df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")

In [29]:
df = spark.read.parquet("users.parquet")
(df
    .write
    .partitionBy("favorite_color")
    .bucketBy(42, "name")
    .saveAsTable("people_partitioned_bucketed"))

In [30]:
df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")

In [31]:
#Parquet Files

Parquet is a columnar format that is supported by many other data processing systems.
Spark SQL provides support for both reading and writing Parquet files
that automatically preserves thaschema of the original data. When 
writing Parquet files, all columns are automatically converted to be 
nullable for compatibility reasons.

In [33]:
#Loading Data Programmatically

In [34]:
peopleDF = spark.read.json("people 2.json")

In [35]:
# DataFrames can be saved as Parquet files, maintaining the schema
# information.

In [37]:
peopleDF.write.parquet("people.parquet")

In [38]:


# Read in the Parquet file created above.
# Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile = spark.read.parquet("people.parquet")

In [39]:

#Parquet files can also be used to create a temporary view and than
# used in SQL statements.
parquetFile.createOrReplaceTempView("parquetFile")
teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.show()

+------+
|  name|
+------+
|Justin|
+------+



In [40]:
#Schema Marging
# by default schema merging is turned off ,
#but can be turned on as follows:

In [41]:
from pyspark.sql import Row

# spark is from the previous example.
# Create a simple DataFrame, stored into a partition directory
sc = spark.sparkContext

squaresDF = spark.createDataFrame(sc.parallelize(range(1, 6))
                                  .map(lambda i: Row(single=i, double=i ** 2)))
squaresDF.write.parquet("data/test_table/key=1")

# Create another DataFrame in a new partition directory,
# adding a new column and dropping an existing column
cubesDF = spark.createDataFrame(sc.parallelize(range(6, 11))
                                .map(lambda i: Row(single=i, triple=i ** 3)))
cubesDF.write.parquet("data/test_table/key=2")

# Read the partitioned table
mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()


root
 |-- double: long (nullable = true)
 |-- single: long (nullable = true)
 |-- triple: long (nullable = true)
 |-- key: integer (nullable = true)



In [42]:
# JSON Files

*Spark SQL can automatically infer the schema of a JSON dataset and 
load it as a DataFrame. This conversation can be using SparkSession.read.json on
a JSON file.*

*Note that the file that is offered as a json file is not a typical JSON file.
Each line must contain a separate, self-contained valid JSON object.
*

*For a regular multi-line JSON file, set the multiLine parameter to True.*

In [43]:
# spark is from the previous example.
sc = spark.sparkContext

In [44]:
# A JSON dataset is pointed to by path.
# The path can be either a single text file or a directiry story
# text files

In [45]:
path = "people 2.json"

In [46]:
peopleDF = spark.read.json(path)

In [48]:
# The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [49]:
# Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

In [50]:
# SQL statements can be run by using the sql methods provided by
# spark
teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")

In [51]:
teenagerNamesDF.show()

+------+
|  name|
+------+
|Justin|
+------+



In [56]:
# Alternatively, a DataFrame can be created for a JSON dataset represented
# by an RDD[String] storing one JSON object per string
jsonStrings = ['{"name":"Nick","address":{"city":"Columbus","state":"Ohio"}}']

In [57]:
otherPeopleRDD = sc.parallelize(jsonStrings)
otherPeople = spark.read.json(otherPeopleRDD)
otherPeople.show()

+----------------+----+
|         address|name|
+----------------+----+
|[Columbus, Ohio]|Nick|
+----------------+----+



****Hive Tables****

Spark SQL also supports reading and writing data stored in Apache Hive. However, since Hive has a large number of dependencies, these dependencies are not included in the default Spark distribution. If Hive dependencies can be found on the classpath, Spark will load them automatically. Note that these Hive dependencies must also be present on all of the worker nodes, as they will need access to the Hive serialization and deserialization libraries (SerDes) in order to access data stored in Hive.

In [58]:
from os.path import expanduser, join, abspath

from pyspark.sql import SparkSession
from pyspark.sql import Row

In [59]:
# warehouse_location points to the default location for managed databases and tables
warehouse_location = abspath('spark-warehouse')

In [60]:
spark = SparkSession\
    .builder\
    .appName("Python Spark SQL Hive integration example")\
    .config("spark.sql.warehhouse.dir", warehouse_location)\
    .enableHiveSupport()\
    .getOrCreate()

In [61]:
# spark is an existing SparkSession
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
spark.sql("LOAD DATA LOCAL INPATH 'kv1.txt' INTO TABLE src")

DataFrame[]

In [62]:
# Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show()

+---+-------+
|key|  value|
+---+-------+
|238|val_238|
| 86| val_86|
|311|val_311|
| 27| val_27|
|165|val_165|
|409|val_409|
|255|val_255|
|278|val_278|
| 98| val_98|
|484|val_484|
|265|val_265|
|193|val_193|
|401|val_401|
|150|val_150|
|273|val_273|
|224|val_224|
|369|val_369|
| 66| val_66|
|128|val_128|
|213|val_213|
+---+-------+
only showing top 20 rows



In [63]:
#Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM src").show()

+--------+
|count(1)|
+--------+
|     500|
+--------+



In [65]:
# The results of SQL queries are themselves 
#DataFrames and support all normal function
sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

In [66]:
# The items in DataFrames are of type Row, which allows you to access
# each column by ordinal.
stringDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value))
for record in stringDS.collect():
    print(record)

Key: 0, Value: val_0
Key: 0, Value: val_0
Key: 0, Value: val_0
Key: 2, Value: val_2
Key: 4, Value: val_4
Key: 5, Value: val_5
Key: 5, Value: val_5
Key: 5, Value: val_5
Key: 8, Value: val_8
Key: 9, Value: val_9


In [67]:
#You can use DataFrames to create temporary views 
#within a SparkSession
Record = Row("key", "vslue")
recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)])
recordsDF.createOrReplaceTempView("records")

#Queries can then join DataFrame data with stored in Hive
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()

+---+------+---+------+
|key| vslue|key| value|
+---+------+---+------+
|  2| val_2|  2| val_2|
|  4| val_4|  4| val_4|
|  5| val_5|  5| val_5|
|  5| val_5|  5| val_5|
|  5| val_5|  5| val_5|
|  8| val_8|  8| val_8|
|  9| val_9|  9| val_9|
| 10|val_10| 10|val_10|
| 11|val_11| 11|val_11|
| 12|val_12| 12|val_12|
| 12|val_12| 12|val_12|
| 15|val_15| 15|val_15|
| 15|val_15| 15|val_15|
| 17|val_17| 17|val_17|
| 18|val_18| 18|val_18|
| 18|val_18| 18|val_18|
| 19|val_19| 19|val_19|
| 20|val_20| 20|val_20|
| 24|val_24| 24|val_24|
| 24|val_24| 24|val_24|
+---+------+---+------+
only showing top 20 rows



In [68]:
# JDBC To Other Databases

In [71]:
# # Note: JDBC loading and saving can be achieved via either the 
# #load/save or jdbs methods
# # Loading data from a JDBC source
# jdbcDF = spark.read\
#     .format("jdbc")\
#     .option("url", "jdbc:postgresql:dbserver")\
#     .option("dbtable", "schema.tablename")\
#     .option("user", "username")\
#     .option("password", "password")\
#     .load()

# jdbcDF2 = spark.read\
#     .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
#          properties={"user": "username", "password": "password"})

# # Specifying dataframe column data types on read
# jdbcDF3 = spark.read\
#     .format("jdbc")\
#     .option("url", "jdbc:postgresql:dbserver")\
#     .option("dbtable", "schema.tablename")\
#     .option("user", "username")\
#     .option("password", "password")\
#     .option("customSchema", "id DECIMAL(38, 0), name STRING")\
#     .load()


# # Saving data to a JDBC source
# jdbcDF.write \
#     .format("jdbc") \
#     .option("url", "jdbc:postgresql:dbserver") \
#     .option("dbtable", "schema.tablename") \
#     .option("user", "username") \
#     .option("password", "password") \
#     .save()

# jdbcDF2.write \
#     .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
#           properties={"user": "username", "password": "password"})

# # Specifying create table column data types on write
# jdbcDF.write \
#     .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \
#     .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
#           properties={"user": "username", "password": "password"})

In [72]:
#Apache Avro Data Source 

*Load and Save functions*

**To load/save data in Avro format, you need to specify the data source
option format as avro (or org.apache.spark.sql.avro)**

In [75]:
# df = spark.read.format("avro").load("users.avro")
# df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")

In [76]:
# Performance Tuning

In [77]:
# Broadcast Hint for SQL Queries

from pyspark.sql.functions import broadcast
broadcast(spark.table("src")).join(spark.table("records"),
                                  "key").show()

+---+------+------+
|key| value| vslue|
+---+------+------+
|  2| val_2| val_2|
|  4| val_4| val_4|
|  5| val_5| val_5|
|  5| val_5| val_5|
|  5| val_5| val_5|
|  8| val_8| val_8|
|  9| val_9| val_9|
| 10|val_10|val_10|
| 11|val_11|val_11|
| 12|val_12|val_12|
| 12|val_12|val_12|
| 15|val_15|val_15|
| 15|val_15|val_15|
| 17|val_17|val_17|
| 18|val_18|val_18|
| 18|val_18|val_18|
| 19|val_19|val_19|
| 20|val_20|val_20|
| 24|val_24|val_24|
| 24|val_24|val_24|
+---+------+------+
only showing top 20 rows



In [78]:
# PySpark Usage Guide for Pandas with Apache Arrow

In [79]:
import numpy as np
import pandas as pd

In [80]:
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

In [81]:
# Generate a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(100,3))

In [82]:
# Create a Spark DataFrame from a Pandas DataFrame using Arrow
df = spark.createDataFrame(pdf)

In [83]:
# Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
result_pdf = df.select("*").toPandas()

In [85]:
# Pandas UDFs (a.k.a. Vectrized UDFs)
# Currently there are 2 types of Pandas UDF: Scalar and Grouped Map

Scalar

Scalar Pandas UDFs are used for vectorizing scalar operations. They can be used with functions such as select and withColumn. The Python function should take pandas.Series as inputs and return a pandas.Series of the same length. Internally, Spark will execute a Pandas UDF by splitting columns into batches and calling the function for each batch as a subset of the data, then concatenating the results together.

The following example shows how to create a scalar Pandas UDF that computes the product of 2 columns.

In [86]:
import pandas as pd

from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a, b):
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local Pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))

0    1
1    4
2    9
dtype: int64


In [87]:
# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()

+-------------------+
|multiply_func(x, x)|
+-------------------+
|                  1|
|                  4|
|                  9|
+-------------------+



In [88]:
# Grouped Map

The following example shows how to use groupby().apply() to subtract the mean from each value in the group.

In [89]:
from pyspark.sql.functions import pandas_udf, PandasUDFType

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").apply(subtract_mean).show()

+---+----+
| id|   v|
+---+----+
|  1|-0.5|
|  1| 0.5|
|  2|-3.0|
|  2|-1.0|
|  2| 4.0|
+---+----+



In [90]:
# Grouped Aggregate

The following example shows how to use this type of UDF to compute mean with groupBy and window operations:

In [94]:
# from pyspark.sql.functions import pandas_udf, PandasUDFType
# from pyspark.sql import Window

# df = spark.createDataFrame(
#     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
#     ("id", "v"))

# @pandas_udf("double", PandasUDFType.GROUPED_AGG)
# def mean_udf(v):
#     return v.mean()

# df.groupby("id").agg(mean_udf(df['v'])).show()
# # +---+-----------+
# # | id|mean_udf(v)|
# # +---+-----------+
# # |  1|        1.5|
# # |  2|        6.0|
# # +---+-----------+

# w = Window \
#     .partitionBy('id') \
#     .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
# df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# # +---+----+------+
# # | id|   v|mean_v|
# # +---+----+------+
# # |  1| 1.0|   1.5|
# # |  1| 2.0|   1.5|
# # |  2| 3.0|   6.0|
# # |  2| 5.0|   6.0|
# # |  2|10.0|   6.0|
# # +---+----+------+


In [1]:
#some
