# PySpark 102

### Spark SQL

October 2020

### Spark SQL

Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as a distributed SQL query engine.

<img src='img/sparksql.png'>

In [2]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

sc = SparkContext() 
ss = SparkSession.builder.getOrCreate()

ss.catalog.clearCache()
sc.setLogLevel("OFF")

sc

#### Some of the sources to create a Spark SQL Data Frame
- From an RDD
- From a CSV, JSON file
- From a Pandas data frame
- From an S3 bucket
- From parquet files

#### There is two ways to create a Spark SQL Data Frame
- `toDF()`
- `createDataFrame`

#### From an RDD

In [18]:
rdd = sc.textFile("data/iris.csv")
rdd = rdd.map(lambda row: row.split(","))
rdd.top(2)

[['7.9', '3.8', '6.4', '2.0', 'virginica'],
 ['7.7', '3.8', '6.7', '2.2', 'virginica']]

In [19]:
df = rdd.toDF(['sepal_length', 'sepal_width', 'petal_length', 'petal_width', 'species'])
df.show(5)

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows



#### From a CSV file
- You can read multiple csv at the same time by passing in the folder path

In [20]:
df = ss.read.option("header",True).csv("data/iris.csv")
df.show(5)

+---+---+---+---+------+
|5.1|3.5|1.4|0.2|setosa|
+---+---+---+---+------+
|4.9|3.0|1.4|0.2|setosa|
|4.7|3.2|1.3|0.2|setosa|
|4.6|3.1|1.5|0.2|setosa|
|5.0|3.6|1.4|0.2|setosa|
|5.4|3.9|1.7|0.4|setosa|
+---+---+---+---+------+
only showing top 5 rows



In [21]:
# All columns are string type and nullable. This is not good.
df.printSchema()

root
 |-- 5.1: string (nullable = true)
 |-- 3.5: string (nullable = true)
 |-- 1.4: string (nullable = true)
 |-- 0.2: string (nullable = true)
 |-- setosa: string (nullable = true)



It is recommended to create a data frame using `createDataFrame()`. For this, you need to define a schema first.

In [22]:
from pyspark.sql.types import *

dfSchema = StructType([
    StructField("sepal_length", DoubleType(), True),
    StructField("sepal_width", DoubleType(), True),
    StructField("petal_length", DoubleType(), True),
    StructField("petal_width", DoubleType(), True),
    StructField("species", StringType(), True),
])

In [24]:
rdd = sc.textFile("data/iris.csv")
rdd = rdd.map(lambda row: row.split(","))
rdd = rdd.map(lambda row: (float(row[0]),float(row[1]),float(row[2]),float(row[3]),row[4]))

df = ss.createDataFrame(rdd, dfSchema)
df.show(5)

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows



#### DataFrame API Basics

In [25]:
df.select("species").show(2)

+-------+
|species|
+-------+
| setosa|
| setosa|
+-------+
only showing top 2 rows



In [26]:
df.drop("species").show(2)

+------------+-----------+------------+-----------+
|sepal_length|sepal_width|petal_length|petal_width|
+------------+-----------+------------+-----------+
|         5.1|        3.5|         1.4|        0.2|
|         4.9|        3.0|         1.4|        0.2|
+------------+-----------+------------+-----------+
only showing top 2 rows



In [27]:
df.withColumnRenamed("species","zpecies").show(2)

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|zpecies|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 2 rows



In [28]:
df.where(df['species'].isNotNull()).show(2)

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 2 rows



In [29]:
from pyspark.sql.functions import *

df.withColumn("new_species",upper(df['species'])).show(2)

+------------+-----------+------------+-----------+-------+-----------+
|sepal_length|sepal_width|petal_length|petal_width|species|new_species|
+------------+-----------+------------+-----------+-------+-----------+
|         5.1|        3.5|         1.4|        0.2| setosa|     SETOSA|
|         4.9|        3.0|         1.4|        0.2| setosa|     SETOSA|
+------------+-----------+------------+-----------+-------+-----------+
only showing top 2 rows



and [more](https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html).

#### DataFrame API SQL

In [30]:
df.groupBy('species').count().show()

+----------+-----+
|   species|count|
+----------+-----+
| virginica|   50|
|versicolor|   50|
|    setosa|   50|
+----------+-----+



In [33]:
df.groupBy('species').avg('sepal_length').sort('avg(sepal_length)', ascending=0).show()

+----------+-----------------+
|   species|avg(sepal_length)|
+----------+-----------------+
| virginica|6.587999999999998|
|versicolor|            5.936|
|    setosa|5.005999999999999|
+----------+-----------------+



In [34]:
df.filter(df.species=='virginica').show(2)

+------------+-----------+------------+-----------+---------+
|sepal_length|sepal_width|petal_length|petal_width|  species|
+------------+-----------+------------+-----------+---------+
|         6.3|        3.3|         6.0|        2.5|virginica|
|         5.8|        2.7|         5.1|        1.9|virginica|
+------------+-----------+------------+-----------+---------+
only showing top 2 rows



In [45]:
df.groupBy('species').agg(max(col('sepal_length')),min(col('petal_length'))).show()

+----------+-----------------+-----------------+
|   species|max(sepal_length)|min(petal_length)|
+----------+-----------------+-----------------+
| virginica|              7.9|              4.5|
|versicolor|              7.0|              3.0|
|    setosa|              5.8|              1.0|
+----------+-----------------+-----------------+



In [46]:
df.collect()

[Row(sepal_length=5.1, sepal_width=3.5, petal_length=1.4, petal_width=0.2, species='setosa'),
 Row(sepal_length=4.9, sepal_width=3.0, petal_length=1.4, petal_width=0.2, species='setosa'),
 Row(sepal_length=4.7, sepal_width=3.2, petal_length=1.3, petal_width=0.2, species='setosa'),
 Row(sepal_length=4.6, sepal_width=3.1, petal_length=1.5, petal_width=0.2, species='setosa'),
 Row(sepal_length=5.0, sepal_width=3.6, petal_length=1.4, petal_width=0.2, species='setosa'),
 Row(sepal_length=5.4, sepal_width=3.9, petal_length=1.7, petal_width=0.4, species='setosa'),
 Row(sepal_length=4.6, sepal_width=3.4, petal_length=1.4, petal_width=0.3, species='setosa'),
 Row(sepal_length=5.0, sepal_width=3.4, petal_length=1.5, petal_width=0.2, species='setosa'),
 Row(sepal_length=4.4, sepal_width=2.9, petal_length=1.4, petal_width=0.2, species='setosa'),
 Row(sepal_length=4.9, sepal_width=3.1, petal_length=1.5, petal_width=0.1, species='setosa'),
 Row(sepal_length=5.4, sepal_width=3.7, petal_length=1.5, pe

In [47]:
df.join(df, "species", 'outer').show(1)

+---------+------------+-----------+------------+-----------+------------+-----------+------------+-----------+
|  species|sepal_length|sepal_width|petal_length|petal_width|sepal_length|sepal_width|petal_length|petal_width|
+---------+------------+-----------+------------+-----------+------------+-----------+------------+-----------+
|virginica|         6.3|        3.3|         6.0|        2.5|         6.3|        3.3|         6.0|        2.5|
+---------+------------+-----------+------------+-----------+------------+-----------+------------+-----------+
only showing top 1 row



#### UDF (User-defined functions)

In [36]:
def return_sepal_bucket(sepal):
    if sepal < 6:
        return "small"
    return "big"

return_sepal_bucket_udf = udf(return_sepal_bucket,StringType())

In [50]:
df.select("sepal_length", return_sepal_bucket_udf("sepal_length"), return_sepal_bucket_udf("sepal_width").alias("WIDTH")).show(2)

+------------+---------------------------------+-----+
|sepal_length|return_sepal_bucket(sepal_length)|WIDTH|
+------------+---------------------------------+-----+
|         5.1|                            small|small|
|         4.9|                            small|small|
+------------+---------------------------------+-----+
only showing top 2 rows



#### Saving info

In [51]:
df.write.parquet("ex02_output")

In [52]:
df.write.csv("ex02.csv")
# ss.read.parquet().write.csv()

In [55]:
df.rdd.top(5)

[Row(sepal_length=7.9, sepal_width=3.8, petal_length=6.4, petal_width=2.0, species='virginica'),
 Row(sepal_length=7.7, sepal_width=3.8, petal_length=6.7, petal_width=2.2, species='virginica'),
 Row(sepal_length=7.7, sepal_width=3.0, petal_length=6.1, petal_width=2.3, species='virginica'),
 Row(sepal_length=7.7, sepal_width=2.8, petal_length=6.7, petal_width=2.0, species='virginica'),
 Row(sepal_length=7.7, sepal_width=2.6, petal_length=6.9, petal_width=2.3, species='virginica')]

In [53]:
df.toPandas()

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,species
0,5.1,3.5,1.4,0.2,setosa
1,4.9,3.0,1.4,0.2,setosa
2,4.7,3.2,1.3,0.2,setosa
3,4.6,3.1,1.5,0.2,setosa
4,5.0,3.6,1.4,0.2,setosa
...,...,...,...,...,...
145,6.7,3.0,5.2,2.3,virginica
146,6.3,2.5,5.0,1.9,virginica
147,6.5,3.0,5.2,2.0,virginica
148,6.2,3.4,5.4,2.3,virginica


In [None]:
sc.stop() # Don't forget to close your connection

# Secret

Koalas üê®

- https://medium.com/@vivianamarquez/is-koalas-the-new-pandas-843a0f7b9003
- https://github.com/vivianamarquez/ALOBH/blob/master/Demos/Koalas.ipynb

Mischa: Check this: https://github.com/vivianamarquez/Namremmiz/blob/master/Jupyter%20Notebooks/PySpark%20Code/MasterPySpark.py