# Concatenating Tables with Set-Like Operations in `pyspark`

Now let's look at combining tables with `union`, `intersect`, and `except` in `pyspark`.

In [7]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Ops').config('spark.driver.host', 'localhost').getOrCreate()

## Example - Auto Sales in Spark

In [8]:
from more_pyspark import to_pandas
sales_aprk = spark.read.csv("./data/auto_sales_apr.csv",  header=True, inferSchema=True)
sales_aprk.collect() >> to_pandas

Unnamed: 0,Salesperson,Compact,Sedan,SUV,Truck
0,Ann,22,18,15,12
1,Bob,20,14,6,24
2,Yolanda,19,10,28,17
3,Xerxes,11,27,17,9


In [9]:
sales_mayk = spark.read.csv("./data/auto_sales_may.csv",  header=True, inferSchema=True)
sales_mayk.collect() >> to_pandas

Unnamed: 0,Salesperson,Compact,Sedan,SUV,Truck
0,Ann,22,18,15,12
1,Bob,19,12,17,20
2,Yolanda,19,8,32,15
3,Xerxes,12,23,18,9


## `UNION ALL` in `pyspark`

Both `union` and `unionAll` area actually `UNION ALL`

In [10]:
(sales_aprk
 .union(sales_mayk)
 .collect()
) >> to_pandas

Unnamed: 0,Salesperson,Compact,Sedan,SUV,Truck
0,Ann,22,18,15,12
1,Bob,20,14,6,24
2,Yolanda,19,10,28,17
3,Xerxes,11,27,17,9
4,Ann,22,18,15,12
5,Bob,19,12,17,20
6,Yolanda,19,8,32,15
7,Xerxes,12,23,18,9


## `UNION/UNION DISTINCT` in `pyspark`

Use `distinct` to get the usual `UNION/UNION DISTINCT`

In [11]:
(sales_aprk
 .union(sales_mayk)
 .distinct()
 .collect()
) >> to_pandas

Unnamed: 0,Salesperson,Compact,Sedan,SUV,Truck
0,Xerxes,12,23,18,9
1,Bob,20,14,6,24
2,Yolanda,19,8,32,15
3,Ann,22,18,15,12
4,Bob,19,12,17,20
5,Yolanda,19,10,28,17
6,Xerxes,11,27,17,9


## Adding a `month` column

As mentioned before, we really should add a month column here. Note that we need to use `lit` to add a *literal constant*

In [12]:
from pyspark.sql.functions import lit

(sales_aprk.withColumn('month', lit('April'))
 .union(sales_mayk.withColumn('month', lit('May')))
 .collect()
) >> to_pandas

Unnamed: 0,Salesperson,Compact,Sedan,SUV,Truck,month
0,Ann,22,18,15,12,April
1,Bob,20,14,6,24,April
2,Yolanda,19,10,28,17,April
3,Xerxes,11,27,17,9,April
4,Ann,22,18,15,12,May
5,Bob,19,12,17,20,May
6,Yolanda,19,8,32,15,May
7,Xerxes,12,23,18,9,May


## Performing `INTERSECT`

Note that `intersect` and `intersectAll` are synonymous.

In [13]:
sales_aprk.intersect(sales_mayk).collect() >> to_pandas

Unnamed: 0,Salesperson,Compact,Sedan,SUV,Truck
0,Ann,22,18,15,12


## Performing a set difference with `exceptAll`

In [14]:
sales_aprk.exceptAll(sales_mayk).collect() >> to_pandas

Unnamed: 0,Salesperson,Compact,Sedan,SUV,Truck
0,Yolanda,19,10,28,17
1,Xerxes,11,27,17,9
2,Bob,20,14,6,24


## <font color="red"> Exercise 2 </font>

In the data folder, you will find 6 files that contain a sample 100,000 rows from the uber data for the month apr14-sep14.  Perform the following tasks:

1. Use `glob` to get all 6 file paths.
2. Use a regular expression to create a `lambda` function that pulls the month from the files.
3. Read the 6 `pyspark` dataframes into a `dict` with keys equal to the month name and values containing the corresponding data frame.
4. Use a dictionary comprehension to add a month column to each `df`.
5. Use the accumulator pattern and the `union` method to combine these 6 data frames into one combined `pysaprk df`
6. Inspect the head and compute the number of rows (use the `count` method)

In [15]:
from glob import glob
files = glob('./data/uber-raw-data-*14-sample.csv')
files

['./data/uber-raw-data-apr14-sample.csv',
 './data/uber-raw-data-aug14-sample.csv',
 './data/uber-raw-data-jul14-sample.csv',
 './data/uber-raw-data-jun14-sample.csv',
 './data/uber-raw-data-may14-sample.csv',
 './data/uber-raw-data-sep14-sample.csv']

In [16]:
import re
FILE_NAME_RE = re.compile(r'^\./data/uber-raw-data-([a-z]*)14-sample\.csv$')
file_name = lambda p: FILE_NAME_RE.match(p).group(1) 
file_names = lambda files: [file_name(p) for p in files]
file_names(files)

['apr', 'aug', 'jul', 'jun', 'may', 'sep']

In [43]:
dfs = {name:spark.read.csv(path, header=True, inferSchema=True) for name, path in zip(file_names(files), files)}
dfs['apr'].take(5) >> to_pandas 

Unnamed: 0,Date/Time,Lat,Lon,Base
0,4/18/2014 21:38:00,40.7359,-73.9852,B02682
1,4/23/2014 15:19:00,40.7642,-73.9543,B02598
2,4/10/2014 7:15:00,40.7138,-74.0103,B02598
3,4/11/2014 15:23:00,40.7847,-73.9698,B02682
4,4/7/2014 17:26:00,40.646,-73.7767,B02598


In [42]:
from pyspark.sql.functions import lit
def add_month(mo):
    return dfs[mo].withColumn('month', lit(mo))
add_month('apr').take(5) >> to_pandas

Unnamed: 0,Date/Time,Lat,Lon,Base,month
0,4/18/2014 21:38:00,40.7359,-73.9852,B02682,apr
1,4/23/2014 15:19:00,40.7642,-73.9543,B02598,apr
2,4/10/2014 7:15:00,40.7138,-74.0103,B02598,apr
3,4/11/2014 15:23:00,40.7847,-73.9698,B02682,apr
4,4/7/2014 17:26:00,40.646,-73.7767,B02598,apr


In [41]:
addingmonth = {f:add_month(f) for f in file_names(files)}
addingmonth['jul'].take(5) >> to_pandas

Unnamed: 0,Date/Time,Lat,Lon,Base,month
0,7/29/2014 19:34:00,40.714,-74.0144,B02682,jul
1,7/11/2014 10:24:00,40.7264,-73.9553,B02617,jul
2,7/11/2014 18:44:00,40.7394,-73.9912,B02617,jul
3,7/30/2014 10:29:00,40.774,-73.8715,B02617,jul
4,7/10/2014 15:57:00,40.7153,-74.0021,B02682,jul


In [40]:
(addingmonth['apr'].union(addingmonth['may']).take(5)) >> to_pandas 

Unnamed: 0,Date/Time,Lat,Lon,Base,month
0,4/18/2014 21:38:00,40.7359,-73.9852,B02682,apr
1,4/23/2014 15:19:00,40.7642,-73.9543,B02598,apr
2,4/10/2014 7:15:00,40.7138,-74.0103,B02598,apr
3,4/11/2014 15:23:00,40.7847,-73.9698,B02682,apr
4,4/7/2014 17:26:00,40.646,-73.7767,B02598,apr


In [30]:
for idx,df in enumerate(addingmonth.values()):
    if idx == 0:
        df = df
        pysparkdf = df
    else:
        df = df
        pysparkdf = pysparkdf.union(df)

In [39]:
pysparkdf.take(5) >> to_pandas

Unnamed: 0,Date/Time,Lat,Lon,Base,month
0,4/18/2014 21:38:00,40.7359,-73.9852,B02682,apr
1,4/23/2014 15:19:00,40.7642,-73.9543,B02598,apr
2,4/10/2014 7:15:00,40.7138,-74.0103,B02598,apr
3,4/11/2014 15:23:00,40.7847,-73.9698,B02682,apr
4,4/7/2014 17:26:00,40.646,-73.7767,B02598,apr


In [49]:
pysparkdf.count()

600000

## Up Next

Stuff