In [1]:
import pyspark

spark = pyspark.sql.SparkSession.builder.getOrCreate()
# Using .getOrCreate will only create a new spark session once. 
# Subsequent calls to that method will re-use the existing session.

In [3]:
import multiprocessing

nprocs = multiprocessing.cpu_count()
nprocs

4

In [4]:
import pyspark.sql.functions as F

While spark dataframes might superficially look like pandas dataframes, and even share some of the same methods and syntax, it is important to keep in mind they are 2 seperate types of objects, and, while spark and pandas code might look superficially similar, it tends to be semantically very different.

In [5]:
import pandas as pd
import numpy as np

In [6]:
np.random.seed(456) # create pandas df to convert to a spark df

pandas_dataframe = pd.DataFrame(dict(n = np.arange(20), group = np.random.choice(list('abc'), 20)))
pandas_dataframe

Unnamed: 0,n,group
0,0,b
1,1,b
2,2,c
3,3,a
4,4,c
5,5,c
6,6,a
7,7,b
8,8,a
9,9,b


In [7]:
df = spark.createDataFrame(pandas_dataframe)
df #spark is lazy, in that it won't show us values until it has to

DataFrame[n: bigint, group: string]

In [8]:
df.show(5) # showing the Spark df

+---+-----+
|  n|group|
+---+-----+
|  0|    b|
|  1|    b|
|  2|    c|
|  3|    a|
|  4|    c|
+---+-----+
only showing top 5 rows



In [10]:
df.describe().show()

+-------+-----------------+-----+
|summary|                n|group|
+-------+-----------------+-----+
|  count|               20|   20|
|   mean|              9.5| null|
| stddev|5.916079783099616| null|
|    min|                0|    a|
|    max|               19|    c|
+-------+-----------------+-----+



In [11]:
from pydataset import data

mpg = spark.createDataFrame(data('mpg'))
mpg.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



#### .select lets us specify what data we want to see in the resulting dataframe

In [13]:
mpg.select(mpg.hwy, mpg.cty, mpg.model).show()

+---+---+------------------+
|hwy|cty|             model|
+---+---+------------------+
| 29| 18|                a4|
| 29| 21|                a4|
| 31| 20|                a4|
| 30| 21|                a4|
| 26| 16|                a4|
| 26| 18|                a4|
| 27| 18|                a4|
| 26| 18|        a4 quattro|
| 25| 16|        a4 quattro|
| 28| 20|        a4 quattro|
| 27| 19|        a4 quattro|
| 25| 15|        a4 quattro|
| 25| 17|        a4 quattro|
| 25| 17|        a4 quattro|
| 25| 15|        a4 quattro|
| 24| 15|        a6 quattro|
| 25| 17|        a6 quattro|
| 23| 16|        a6 quattro|
| 20| 14|c1500 suburban 2wd|
| 15| 11|c1500 suburban 2wd|
+---+---+------------------+
only showing top 20 rows



In [16]:
mpg.select(mpg.hwy, mpg.hwy + 1).show(5)

+---+---------+
|hwy|(hwy + 1)|
+---+---------+
| 29|       30|
| 29|       30|
| 31|       32|
| 30|       31|
| 26|       27|
+---+---------+
only showing top 5 rows



#### use the .alias method to rename it

In [17]:
mpg.select(mpg.hwy.alias("highway_mileage")).show(5)

+---------------+
|highway_mileage|
+---------------+
|             29|
|             29|
|             31|
|             30|
|             26|
+---------------+
only showing top 5 rows



#### store column objects in variables and reference them

In [18]:
col1 = mpg.hwy.alias('higway_mileage')
col2 = (mpg.hwy / 2).alias("highway_mileage+halved")
mpg.select(col1, col2).show(5)

+--------------+----------------------+
|higway_mileage|highway_mileage+halved|
+--------------+----------------------+
|            29|                  14.5|
|            29|                  14.5|
|            31|                  15.5|
|            30|                  15.0|
|            26|                  13.0|
+--------------+----------------------+
only showing top 5 rows



#### create columns with the col and expr functions from pyspark.sql.functions module

In [19]:
from pyspark.sql.functions import col, expr

In [20]:
col('hwy')

Column<b'hwy'>

#### mix and match the syntax

In [24]:
avg_colum = (col('hwy') + col('cty')) /2

mpg.select(
    col('hwy').alias('highway_mileage'),
    mpg.cty.alias('city_mileage'),
    avg_column.alias('avg-mileage'),
).show(5)

+---------------+------------+-----------+
|highway_mileage|city_mileage|avg-mileage|
+---------------+------------+-----------+
|             29|          18|       23.5|
|             29|          21|       25.0|
|             31|          20|       25.5|
|             30|          21|       25.5|
|             26|          16|       21.0|
+---------------+------------+-----------+
only showing top 5 rows



#### exp does everything col does and more. expr returns the same type of column object, but allows us to express manipulations to the column within the string that defines the column.

In [26]:
mpg.select(
    expr('hwy'), # same as col
    expr('hwy + 1'), #addition function
    expr('hwy AS highway_mileage'), #using an alias
    expr('hwy + 1 AS highway_incremented') #combo of previous 2
).show(5)

+---+---------+---------------+-------------------+
|hwy|(hwy + 1)|highway_mileage|highway_incremented|
+---+---------+---------------+-------------------+
| 29|       30|             29|                 30|
| 29|       30|             29|                 30|
| 31|       32|             31|                 32|
| 30|       31|             30|                 31|
| 26|       27|             26|                 27|
+---+---------+---------------+-------------------+
only showing top 5 rows



#### Identical:
- mpg.hwy.alias("highway"),
- col("hwy").alias("highway"),
- expr("hwy").alias("highway"),
- expr("hwy AS highway"),

#### spark SQL, which lets us write SQL queries against our spark dataframes. to start using spark SQL, we'll first "register" the table with spark

In [29]:
mpg.createOrReplaceTempView('mpg') #Spark SQL registers the table with Spark

In [33]:
spark.sql("""
SELECT hwy, cty, (hwy + cty) / 2 AS avg 
FROM mpg
""").show(5) #write SQL query against spark

+---+---+----+
|hwy|cty| avg|
+---+---+----+
| 29| 18|23.5|
| 29| 21|25.0|
| 31| 20|25.5|
| 30| 21|25.5|
| 26| 16|21.0|
+---+---+----+
only showing top 5 rows



In [35]:
mpg.dtypes

[('manufacturer', 'string'),
 ('model', 'string'),
 ('displ', 'double'),
 ('year', 'bigint'),
 ('cyl', 'bigint'),
 ('trans', 'string'),
 ('drv', 'string'),
 ('cty', 'bigint'),
 ('hwy', 'bigint'),
 ('fl', 'string'),
 ('class', 'string')]

In [36]:
mpg.printSchema()

root
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- displ: double (nullable = true)
 |-- year: long (nullable = true)
 |-- cyl: long (nullable = true)
 |-- trans: string (nullable = true)
 |-- drv: string (nullable = true)
 |-- cty: long (nullable = true)
 |-- hwy: long (nullable = true)
 |-- fl: string (nullable = true)
 |-- class: string (nullable = true)



In [37]:
#cast a variable
mpg.select(mpg.hwy.cast('string')).printSchema()

#f a value is not able to be converted, it will be replaced with null

root
 |-- hwy: string (nullable = true)



### Basic built-in functions

concat, sum, avg, min, max

Note that importing the sum function directly will override the built-in sum function. This means you will get an error if you try to sum a list of numbers, because sum will refernce the pyspark sum function, which works with pyspark dataframe columns, while the built-in sum function works with lists of numbers. The same holds true for the built in min and max functions.

In [38]:
from pyspark.sql.functions import * #Can also import specific functions

In [42]:
mpg.select(
    sum(mpg.hwy),
    avg(mpg.hwy).alias('Highway Average'),
    min(mpg.hwy),
    max(mpg.hwy)
).show()

+--------+-----------------+--------+--------+
|sum(hwy)|  Highway Average|min(hwy)|max(hwy)|
+--------+-----------------+--------+--------+
|    5485|23.44017094017094|      12|      44|
+--------+-----------------+--------+--------+



In [45]:
mpg.select(concat(mpg.manufacturer, mpg.model)).show(10)

+---------------------------+
|concat(manufacturer, model)|
+---------------------------+
|                     audia4|
|                     audia4|
|                     audia4|
|                     audia4|
|                     audia4|
|                     audia4|
|                     audia4|
|             audia4 quattro|
|             audia4 quattro|
|             audia4 quattro|
+---------------------------+
only showing top 10 rows



In [46]:
mpg.select(concat(mpg.cyl, lit('cylinders'))).show(5) #f-string literal

+----------------------+
|concat(cyl, cylinders)|
+----------------------+
|            4cylinders|
|            4cylinders|
|            4cylinders|
|            4cylinders|
|            6cylinders|
+----------------------+
only showing top 5 rows



In [47]:
textdf = spark.createDataFrame(
    pd.DataFrame(
    {
        "address": [
                "600 Navarro St ste 600, San Antonio, TX 78205",
                "3130 Broadway St, San Antonio, TX 78209",
                "303 Pearl Pkwy, San Antonio, TX 78215",
                "1255 SW Loop 410, San Antonio, TX 78227",
            ]
    }
    
    )
)

In [48]:
textdf.show(truncate = False) #uncut

+---------------------------------------------+
|address                                      |
+---------------------------------------------+
|600 Navarro St ste 600, San Antonio, TX 78205|
|3130 Broadway St, San Antonio, TX 78209      |
|303 Pearl Pkwy, San Antonio, TX 78215        |
|1255 SW Loop 410, San Antonio, TX 78227      |
+---------------------------------------------+



In [49]:
textdf.show()

+--------------------+
|             address|
+--------------------+
|600 Navarro St st...|
|3130 Broadway St,...|
|303 Pearl Pkwy, S...|
|1255 SW Loop 410,...|
+--------------------+



#### The regexp_extract function lets us specify at least one capture group, and create a new column based on the contents of the capture group.

In [50]:
textdf.select(
    'address',
    regexp_extract('address', r"^(\d+)", 1).alias('street_no'),
    regexp_extract('address', r"^\d+\s([\w\s]+?),", 1).alias('street'),

).show(truncate = False)

+---------------------------------------------+---------+------------------+
|address                                      |street_no|street            |
+---------------------------------------------+---------+------------------+
|600 Navarro St ste 600, San Antonio, TX 78205|600      |Navarro St ste 600|
|3130 Broadway St, San Antonio, TX 78209      |3130     |Broadway St       |
|303 Pearl Pkwy, San Antonio, TX 78215        |303      |Pearl Pkwy        |
|1255 SW Loop 410, San Antonio, TX 78227      |1255     |SW Loop 410       |
+---------------------------------------------+---------+------------------+

