Spark SQL is a Spark module placed above Spark Core. It is available since Spark 1.3(2015).

● It is used to perform processing on structured data called dataframes.

● As such, it performs operations with a level of abstraction and simplicity more advanced

than on RDDs, including:

○ the exploitation of data on these dataframe objects.

○ The use and execution of SQL queries.

○ More advanced optimizations.

● This is the most used part of Spark nowadays.


## Step 1: Installing Py Spark

* pip install pyspark

In [1]:
! pip install pyspark



In [None]:
try:
  import pandas as pd
  from pyspark.sql import SparkSession
except Exception as e:
  print(e)

In [None]:
spark = SparkSession.builder\
.appName("MyProcess")\
.master("local[*]")\
.getOrCreate()

In [4]:
spark

# Step 2: Understanding DataFrames in PySparks
There are four ways to create dataframes with Spark SQL:
● Programmatically: From a list of values, a list of tuples or from a dictionary.

● From a Pandas dataframe: (only valid with the Python API).

● From a Spark RDD: by defining the structure of the data.

● From data sources: Spark SQL supports reading external files through different methods.

* JSON file (spark.read.json("...") method - every line should be a full JSON document)
*  Parquet file (spark.read.parquet("...") method)
* Other kind of files/data sources by using the method
spark.read.format("...")...load("...")


In [5]:
header = ["city", "type", "price"]

data = map(lambda r: (r[0], r[1], float(r[2])),
map(lambda x: x.split(","),
["Paris,Food,19.00", "Marseille,Clothing,12.00",
"Paris,Food,8.00", "Paris,Clothing,15.00",
"Marseille,Food,20.00", "Lyon,Book,10.00"]))

df = spark.createDataFrame(data, header)
df.show()

+---------+--------+-----+
|     city|    type|price|
+---------+--------+-----+
|    Paris|    Food| 19.0|
|Marseille|Clothing| 12.0|
|    Paris|    Food|  8.0|
|    Paris|Clothing| 15.0|
|Marseille|    Food| 20.0|
|     Lyon|    Book| 10.0|
+---------+--------+-----+



In [6]:
df.take(2)

[Row(city='Paris', type='Food', price=19.0),
 Row(city='Marseille', type='Clothing', price=12.0)]

In [7]:
df.printSchema()

root
 |-- city: string (nullable = true)
 |-- type: string (nullable = true)
 |-- price: double (nullable = true)



In [8]:
df

DataFrame[city: string, type: string, price: double]

In [9]:
df.dtypes

[('city', 'string'), ('type', 'string'), ('price', 'double')]

In [10]:
df.explain() 

== Physical Plan ==
Scan ExistingRDD[city#0,type#1,price#2]


In [11]:
df.select("city").show()

+---------+
|     city|
+---------+
|    Paris|
|Marseille|
|    Paris|
|    Paris|
|Marseille|
|     Lyon|
+---------+



In [12]:
df.select(["city", "type"]).show()

+---------+--------+
|     city|    type|
+---------+--------+
|    Paris|    Food|
|Marseille|Clothing|
|    Paris|    Food|
|    Paris|Clothing|
|Marseille|    Food|
|     Lyon|    Book|
+---------+--------+



In [13]:
df.select(["city", "price"]).take(3)

[Row(city='Paris', price=19.0),
 Row(city='Marseille', price=12.0),
 Row(city='Paris', price=8.0)]

# Step 3: basic Manipulation :Schema


In [None]:
from pyspark.sql.types import StringType, FloatType, StructType, StructField

In [None]:
data = map(lambda r: (r[0], r[1], float(r[2])),
map(lambda x: x.split(","),["Paris,Food,19.00", "Marseille,Clothing,12.00",
"Paris,Food,8.00", "Paris,Clothing,15.00",
"Marseille,Food,20.00", "Lyon,Book,10.00"]))

In [None]:
schema = StructType([
                     StructField("city", StringType(), nullable=True),
                     StructField("type", StringType(), nullable=True),
                     StructField("price", FloatType(), nullable=True)
])


In [None]:
df = spark.createDataFrame(
    data,
    schema=schema
)

In [34]:
df.show()

+---------+--------+-----+
|     city|    type|price|
+---------+--------+-----+
|    Paris|    Food| 19.0|
|Marseille|Clothing| 12.0|
|    Paris|    Food|  8.0|
|    Paris|Clothing| 15.0|
|Marseille|    Food| 20.0|
|     Lyon|    Book| 10.0|
+---------+--------+-----+



# Part 4: 
* lets learn about Filter and other function in PySpark

In [35]:
df.filter(df.city == "Paris").show()

+-----+--------+-----+
| city|    type|price|
+-----+--------+-----+
|Paris|    Food| 19.0|
|Paris|    Food|  8.0|
|Paris|Clothing| 15.0|
+-----+--------+-----+



In [36]:
df.filter(df.type == "Food").filter(df.city == "Paris").show()

+-----+----+-----+
| city|type|price|
+-----+----+-----+
|Paris|Food| 19.0|
|Paris|Food|  8.0|
+-----+----+-----+



In [37]:
df.filter(
    (df.city == "Paris") & (df.type == "Food")

).show()

+-----+----+-----+
| city|type|price|
+-----+----+-----+
|Paris|Food| 19.0|
|Paris|Food|  8.0|
+-----+----+-----+



# Part 5:
* lets learn about orderBy Statments 

In [38]:
df.show()

+---------+--------+-----+
|     city|    type|price|
+---------+--------+-----+
|    Paris|    Food| 19.0|
|Marseille|Clothing| 12.0|
|    Paris|    Food|  8.0|
|    Paris|Clothing| 15.0|
|Marseille|    Food| 20.0|
|     Lyon|    Book| 10.0|
+---------+--------+-----+



In [39]:
df.orderBy(df.city).show()

+---------+--------+-----+
|     city|    type|price|
+---------+--------+-----+
|     Lyon|    Book| 10.0|
|Marseille|Clothing| 12.0|
|Marseille|    Food| 20.0|
|    Paris|    Food| 19.0|
|    Paris|Clothing| 15.0|
|    Paris|    Food|  8.0|
+---------+--------+-----+



In [40]:
df.orderBy(df.city).orderBy(df.type).show()

+---------+--------+-----+
|     city|    type|price|
+---------+--------+-----+
|     Lyon|    Book| 10.0|
|Marseille|Clothing| 12.0|
|    Paris|Clothing| 15.0|
|    Paris|    Food| 19.0|
|Marseille|    Food| 20.0|
|    Paris|    Food|  8.0|
+---------+--------+-----+



# Part 6 Manipulating Columns with Py Spark 

In [41]:
from pyspark.sql.functions import lit,rand
df = df.withColumn("six", lit(6))
df.show()

+---------+--------+-----+---+
|     city|    type|price|six|
+---------+--------+-----+---+
|    Paris|    Food| 19.0|  6|
|Marseille|Clothing| 12.0|  6|
|    Paris|    Food|  8.0|  6|
|    Paris|Clothing| 15.0|  6|
|Marseille|    Food| 20.0|  6|
|     Lyon|    Book| 10.0|  6|
+---------+--------+-----+---+



In [42]:
df.withColumn("divide", df.price / 2 ).show()

+---------+--------+-----+---+------+
|     city|    type|price|six|divide|
+---------+--------+-----+---+------+
|    Paris|    Food| 19.0|  6|   9.5|
|Marseille|Clothing| 12.0|  6|   6.0|
|    Paris|    Food|  8.0|  6|   4.0|
|    Paris|Clothing| 15.0|  6|   7.5|
|Marseille|    Food| 20.0|  6|  10.0|
|     Lyon|    Book| 10.0|  6|   5.0|
+---------+--------+-----+---+------+



In [43]:
df.withColumn("sum", df.six + df.six).show()

+---------+--------+-----+---+---+
|     city|    type|price|six|sum|
+---------+--------+-----+---+---+
|    Paris|    Food| 19.0|  6| 12|
|Marseille|Clothing| 12.0|  6| 12|
|    Paris|    Food|  8.0|  6| 12|
|    Paris|Clothing| 15.0|  6| 12|
|Marseille|    Food| 20.0|  6| 12|
|     Lyon|    Book| 10.0|  6| 12|
+---------+--------+-----+---+---+



In [44]:
df.drop("six").show()

+---------+--------+-----+
|     city|    type|price|
+---------+--------+-----+
|    Paris|    Food| 19.0|
|Marseille|Clothing| 12.0|
|    Paris|    Food|  8.0|
|    Paris|Clothing| 15.0|
|Marseille|    Food| 20.0|
|     Lyon|    Book| 10.0|
+---------+--------+-----+



# Part 7:
* Advanced Manipulations


In [71]:
header = ["city", "type", "price"]

data = map(lambda r: (r[0], r[1], float(r[2])),
           map(lambda x: x.split(","),
               ["Paris,Food,19.00", "Marseille,Clothing,12.00",
                "Paris,Food,8.00", "Paris,Clothing,15.00",
                "Marseille,Food,20.00", "Lyon,Book,10.00"]))

df = spark.createDataFrame(data, header)
df.show()

from pyspark.sql import Row
from pyspark.sql.functions import sum as _sum
df.groupBy("city").agg(_sum("price")).show()

+---------+--------+-----+
|     city|    type|price|
+---------+--------+-----+
|    Paris|    Food| 19.0|
|Marseille|Clothing| 12.0|
|    Paris|    Food|  8.0|
|    Paris|Clothing| 15.0|
|Marseille|    Food| 20.0|
|     Lyon|    Book| 10.0|
+---------+--------+-----+

+---------+----------+
|     city|sum(price)|
+---------+----------+
|Marseille|      32.0|
|    Paris|      42.0|
|     Lyon|      10.0|
+---------+----------+



# Part 8:
* Custom Function

In [72]:
df = spark.createDataFrame([[1, 2], [2, 3], [3, 4], [4, 5]], ["x1", "x2"]).show()

+---+---+
| x1| x2|
+---+---+
|  1|  2|
|  2|  3|
|  3|  4|
|  4|  5|
+---+---+



In [97]:

from pyspark.sql.functions import udf
from pyspark.sql.types import *

l = [('Alice', 1)]
df = spark.createDataFrame(l)
df.show()


+-----+---+
|   _1| _2|
+-----+---+
|Alice|  1|
+-----+---+

