# Advanced Spark SQL

We will begin by downloading a dataset from AirBnB with appartment listings from Amsterdam, Holland. 

Please execute the following cell to download and extract the CSV file.

In [87]:
!mkdir /home/spark/notebooks/airbnb
!wget -P /home/spark/notebooks/airbnb/ http://data.insideairbnb.com/the-netherlands/north-holland/amsterdam/2019-07-08/data/listings.csv.gz
!gunzip /home/spark/notebooks/airbnb/listings.csv.gz

mkdir: cannot create directory ‘/home/spark/notebooks/airbnb’: File exists
--2019-08-23 17:18:38--  http://data.insideairbnb.com/the-netherlands/north-holland/amsterdam/2019-07-08/data/listings.csv.gz
Resolving data.insideairbnb.com (data.insideairbnb.com)... 52.216.145.42
Connecting to data.insideairbnb.com (data.insideairbnb.com)|52.216.145.42|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 18977711 (18M) [application/x-gzip]
Saving to: ‘/home/spark/notebooks/airbnb/listings.csv.gz’


2019-08-23 17:18:44 (3.86 MB/s) - ‘/home/spark/notebooks/airbnb/listings.csv.gz’ saved [18977711/18977711]



Next for some data preparation before ingesting it with Spark. This dataset contains quoted strings with newline characters inside them which Spark's CSV reader cannot handle so we will get rid of those with Python's `csv` library. The file also contains lists inside curly brackets, which we will also remove. Finally, to be on a safe side, we will use `~` characters as quotation marks.

Please execute the following cell.

In [1]:
import csv
import sys

csv.field_size_limit(sys.maxsize)
with open('/home/spark/notebooks/airbnb/listings.csv', 'r') as file, \
  open('/home/spark/notebooks/airbnb/listings-corr.csv', 'w') as outf:
    csvread = csv.reader(file, dialect='excel', quotechar='"',delimiter=',')
    for row in csvread:
        outf.write(", ".join(['~' + field.replace('~', 'approx.').replace('\r\n', '').\
                              replace('\n', '').replace('{', '').replace('}', '') + '~' for field in row])+'\n')

Now, let's create a Spark session. The only addition, compared to the previous notebooks, is the `spark.sql.execution.arrow.enabled` parameter. Pandas UDFs wouldn't function well without using the Arrow library, which is wat that parameter sets up.

In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark course - Advanced SQL").\
    config("spark.sql.execution.arrow.enabled", "true").\
    master("local[*]").enableHiveSupport().getOrCreate()

sc = spark.sparkContext

Finally, let's import classes you will need in the notebook. (Notice that Spark SQL functions are available as `F`.)

In [2]:
from pyspark.sql import functions as F
from pyspark.sql.types import *

Load the "corrected" CSV file (`/home/spark/notebooks/airbnb/listings-corr.csv`) using Spark's CSV reader. Set the `header` option to `True` (because we want Spark to read column names from the first line) and also set `ignoreLeadingWhiteSpace` option to `True` (so that Spark trims column names and String values). Set the quotation mark (`quote` parameter) to `~` and set the `escape` parameter to `{` (this will turn off escaping quotation marks within quoted strings).

In [3]:
csvdf = spark.read.csv('/home/spark/notebooks/airbnb/listings-corr.csv', header=True, quote="~", escape="{", ignoreLeadingWhiteSpace=True)

Look at the schema and the contents of the loaded DataFrame.

In [4]:
csvdf.printSchema()

root
 |-- id: string (nullable = true)
 |-- listing_url: string (nullable = true)
 |-- scrape_id: string (nullable = true)
 |-- last_scraped: string (nullable = true)
 |-- name: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- space: string (nullable = true)
 |-- description: string (nullable = true)
 |-- experiences_offered: string (nullable = true)
 |-- neighborhood_overview: string (nullable = true)
 |-- notes: string (nullable = true)
 |-- transit: string (nullable = true)
 |-- access: string (nullable = true)
 |-- interaction: string (nullable = true)
 |-- house_rules: string (nullable = true)
 |-- thumbnail_url: string (nullable = true)
 |-- medium_url: string (nullable = true)
 |-- picture_url: string (nullable = true)
 |-- xl_picture_url: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_url: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- host_since: string (nullable = true)
 |-- host_location: string (nullable

In [5]:
csvdf.show()

+-----+--------------------+--------------+------------+--------------------+--------------------+--------------------+--------------------+-------------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------+----------+--------------------+--------------+-------+--------------------+-----------+----------+--------------------+--------------------+------------------+------------------+--------------------+-----------------+--------------------+--------------------+--------------------+-------------------+-------------------------+--------------------+--------------------+----------------------+--------------------+--------------------+----------------------+----------------------------+---------+-------------+-------+---------+--------------------+------------+-----------+--------+---------+-----------------+-----------------+---------------+------------+---------+--------+----+--------+--------

One thing which is apparent is that all columns are strings. You will do some calculations later with `price` and `review_scores_rating` columns. So `cast` those two columns to `double`. You will need to first remove the "$" sign from prices which you can do with `substring_index` function.

You will create a new column with the required definition and the same name (namely, "price") and then drop the old columns. Be careful not to drop the column by name (because now you will have two columns with the same name), but by its reference. 

In [4]:
csvdf = csvdf.withColumn("price", F.substring_index(csvdf.price, '$', -1).cast('double')).drop(csvdf.price)
csvdf = csvdf.withColumn("review_scores_rating", csvdf.review_scores_rating.cast('double')).drop(csvdf.review_scores_rating)

See if you get satisfying results.

You can convert this DataFrame to a local Pandas DataFrame using `toPandas` method. This will collect all the data into the driver's memory, so it's possible only with smaller datasets, such as this one.

Try that now.

In [28]:
csvdf.toPandas()



Unnamed: 0,id,listing_url,scrape_id,last_scraped,name,summary,space,description,experiences_offered,neighborhood_overview,...,instant_bookable,is_business_travel_ready,cancellation_policy,require_guest_profile_picture,require_guest_phone_verification,calculated_host_listings_count,calculated_host_listings_count_entire_homes,calculated_host_listings_count_private_rooms,calculated_host_listings_count_shared_rooms,reviews_per_month
0,2818,https://www.airbnb.com/rooms/2818,20190708161114,2019-07-09,Quiet Garden View Room & Super Fast WiFi,Quiet Garden View Room & Super Fast WiFi,I'm renting a bedroom (room overlooking the ga...,Quiet Garden View Room & Super Fast WiFi I'm r...,none,"Indische Buurt (""Indies Neighborhood"") is a ne...",...,t,f,strict_14_with_grace_period,f,f,1,0,1,0,2.09
1,20168,https://www.airbnb.com/rooms/20168,20190708161114,2019-07-09,Studio with private bathroom in the centre 1,"Cozy studio on your own private floor, 100% in...",For those who like all facets of city life. In...,"Cozy studio on your own private floor, 100% in...",none,Located just in between famous central canals....,...,f,f,strict_14_with_grace_period,f,f,2,0,2,0,2.45
2,25428,https://www.airbnb.com/rooms/25428,20190708161114,2019-07-09,Lovely apt in City Centre (w.lift) near Jordaan,,"This nicely furnished, newly renovated apt is...","This nicely furnished, newly renovated apt is...",none,,...,f,f,strict_14_with_grace_period,f,f,2,2,0,0,0.17
3,27886,https://www.airbnb.com/rooms/27886,20190708161114,2019-07-09,"Romantic, stylish B&B houseboat in canal district",Stylish and romantic houseboat on fantastic hi...,For a romantic couple: A beautifully restored ...,Stylish and romantic houseboat on fantastic hi...,none,"Central, quiet, safe, clean and beautiful.",...,t,f,strict_14_with_grace_period,f,f,1,0,1,0,2.14
4,28871,https://www.airbnb.com/rooms/28871,20190708161114,2019-07-09,Comfortable double room,,In a monumental house right in the center of A...,In a monumental house right in the center of A...,none,,...,f,f,moderate,f,f,3,0,3,0,2.56
5,29051,https://www.airbnb.com/rooms/29051,20190708161114,2019-07-09,Comfortable single room,because of the city imposing a 4 paying guest ...,In a monumental house right in the center of A...,because of the city imposing a 4 paying guest ...,none,the street is quite lively especially on weeke...,...,f,f,moderate,f,f,3,0,3,0,4.23
6,31080,https://www.airbnb.com/rooms/31080,20190708161114,2019-07-09,2-story apartment + rooftop terrace,,My apartment is light and cosy. The three bed...,My apartment is light and cosy. The three bed...,none,,...,f,f,moderate,f,f,1,1,0,0,0.33
7,38266,https://www.airbnb.com/rooms/38266,20190708161114,2019-07-08,Nice and quiet place in the Jordaan,Cosy 1 bedroom apartment in Amsterdam's Jordaa...,"The apartment has a very nice feel with ""rivie...",Cosy 1 bedroom apartment in Amsterdam's Jordaa...,none,,...,f,f,strict_14_with_grace_period,f,f,1,1,0,0,1.82
8,41125,https://www.airbnb.com/rooms/41125,20190708161114,2019-07-09,Amsterdam Center Entire Apartment,,"A nice, sunny and spacious apartment in the ce...","A nice, sunny and spacious apartment in the ce...",none,,...,f,f,moderate,f,f,1,1,0,0,0.81
9,42970,https://www.airbnb.com/rooms/42970,20190708161114,2019-07-09,Comfortable room@PERFECT location + 2 bikes,A home away from home Great location Including...,"What I offer: great location, comfortable bed,...",A home away from home Great location Including...,none,My home is in the best neighborhood. Everythin...,...,t,f,strict_14_with_grace_period,f,t,2,0,2,0,4.18


## Array columns

Spark supports columns of complex types: arrays, nested structs and maps. 

If you look at the `amenities` column, you will see it contains comma-separated names of extra features each accomodation listing offers. We will convert that column to `array` type.

Select several values from that column.

In [7]:
csvdf.select(csvdf.amenities).show(10, False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|amenities                                                                                                                                                                                                                                                           

To convert this column to arrays you can simply use the `split` function. Do that now and replace the `amenities` column appropriately.

In [5]:
csvdf = csvdf.withColumn('amenities', F.split(csvdf.amenities, ',')).drop(csvdf.amenities)

Look at the new schema.

In [9]:
csvdf.printSchema()

root
 |-- id: string (nullable = true)
 |-- listing_url: string (nullable = true)
 |-- scrape_id: string (nullable = true)
 |-- last_scraped: string (nullable = true)
 |-- name: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- space: string (nullable = true)
 |-- description: string (nullable = true)
 |-- experiences_offered: string (nullable = true)
 |-- neighborhood_overview: string (nullable = true)
 |-- notes: string (nullable = true)
 |-- transit: string (nullable = true)
 |-- access: string (nullable = true)
 |-- interaction: string (nullable = true)
 |-- house_rules: string (nullable = true)
 |-- thumbnail_url: string (nullable = true)
 |-- medium_url: string (nullable = true)
 |-- picture_url: string (nullable = true)
 |-- xl_picture_url: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_url: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- host_since: string (nullable = true)
 |-- host_location: string (nullable

Select the first amenity of each listing and show the first 20.

In [10]:
csvdf.select(csvdf.amenities[0]).show()

+------------+
|amenities[0]|
+------------+
|    Internet|
|          TV|
|          TV|
|          TV|
|    Internet|
|    Internet|
|          TV|
|    Internet|
|          TV|
|          TV|
|          TV|
|          TV|
|          TV|
|          TV|
|          TV|
|          TV|
|          TV|
|  "Cable TV"|
|          TV|
|          TV|
+------------+
only showing top 20 rows



What do you think would happen if you tried to fetch a non-existant element? Try fetching 100th amenity of each listing.

In [11]:
csvdf.select(csvdf.amenities[100]).show()

+--------------+
|amenities[100]|
+--------------+
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
|          null|
+--------------+
only showing top 20 rows



Select all listings that offer a "Refrigerator". How many are there such listings in Amsterdam?

In [12]:
csvdf.where(F.array_contains(csvdf.amenities, 'Refrigerator')).count()

6815

## Grouped map Pandas UDF

You've already used Spark SQL UDFs in previous notebooks. Spark offers another, Python-specific, class of UDFs for performing vectorized processing using the Pandas library. Data transfer between the JVM and Python is done through the Arrow library, which we already enabled.

SCALAR Pandas UDFs convert one or more columns and produce a single output column of the same row count. The inner function operates on Pandas Series objects and produces a Seriec object.

GROUPED_MAP Pandas UDFs are used after a grouping operation with the `apply` function. They operate on the individual grouped subsets so that they can change their columns and numbers of rows. The inner function operates on a Pandas DataFrame object and produces a Pandas DataFrame object.

GROUPED_AGG Pandas UDFs are used after a grouping operation with the `agg` function. They also operate on the individual grouped subsets but they return a single primitive value for each of those.

To illustrate the GROUPED_MAP Pandas UDF, let's calculate the difference between price and minimum price per host and let's call that difference `mindiff`. First group by `host_id`, then `apply` a function decorated with `pandas_udf`. It is a good idea to minimize the number of columns that will be used inside the grouped map function (and minimize data transfering and conversions) so before calling groupBy select only `host_id` and `price` columns. 

To construct the `pandas_udf` decorator you need to supply a schema which you can specify as a string in SQL syntax (to the three selected column add `mindiff` of type `double`). The inner decorated function needs only a single argument (a Pandas DataFrame). Use Pandas API to add the new column to that DataFrame.

In [6]:
import pandas as pd

newschema = "host_id string, price double, mindiff double"  #StructType(csvdf.schema.fields + [StructField('mindiff', DoubleType())])

@F.pandas_udf(newschema, F.PandasUDFType.GROUPED_MAP)
def calcmindiff(pdf):
    mn = pdf.price.min()
    rez = pdf.assign(mindiff=pd.Series(pdf.price - mn).values)
    return rez

newcsvdf = csvdf.select('host_id', 'price').groupBy(csvdf.host_id).apply(calcmindiff)

Now order the results by `mindiff` descendingly to see the results.

In [11]:
newcsvdf.orderBy(F.col('mindiff').desc()).show()

+---------+--------+-------+
|  host_id|      id|mindiff|
+---------+--------+-------+
|178187873|23757459|  885.0|
|  3558013|14645445|  819.0|
|  3558013|11830487|  819.0|
|259132884|35394328|  806.0|
| 14047320|33339867|  720.0|
|  3357851|25706976|  720.0|
| 85420364|14144923|  710.0|
|131190497|33767215|  678.0|
| 14874061|26511662|  670.0|
| 39108038|21469784|  645.0|
|  1826139|  510836|  561.0|
| 76104209|22744417|  550.0|
|131423393|23388585|  550.0|
|131423393|23407039|  549.0|
| 76104209|23141400|  500.0|
| 11601106| 5860147|  500.0|
| 22879651|21992438|  493.0|
|   329249| 1233879|  490.0|
|   329249| 1237639|  490.0|
|  9957140| 2059084|  481.0|
+---------+--------+-------+
only showing top 20 rows



## Grouped aggregate Pandas UDFs

The second type of Pandas UDFs available in Spark is grouped aggregate Pandas UDFs. Let's say we're interested in a metric called "average price rating" defined as average price multiplied by rating, per host. 

To calculate it you can use Pandas dot product of prices and ratings (`review_scores_rating` column you previously converted to double) and divide the result with the number of prices. Wrap that in a GROUPED_AGG `pandas_udf` and call it after grouping by `host_id`. Show the 20 largest average price ratings. 

In [7]:
@F.pandas_udf("double", F.PandasUDFType.GROUPED_AGG)
def avgpricerating(price, rating):
    return price.dot(rating) / price.size

csvdf.groupBy('host_id').\
  agg(avgpricerating(csvdf.price, csvdf.review_scores_rating).alias('avgpricerating')).\
  orderBy(F.col('avgpricerating').desc()).show()

+---------+--------------+
|  host_id|avgpricerating|
+---------+--------------+
| 94719908|       99900.0|
|260778803|       99900.0|
|  6268597|       99900.0|
| 11152072|       90000.0|
| 55362111|       90000.0|
|119159862|       89600.0|
| 20644650|       88825.0|
| 23698637|       87500.0|
|  2525918|       85000.0|
| 29136151|       84000.0|
|  6035037|       81600.0|
| 57865143|       80000.0|
| 35652610|       80000.0|
| 11224822|       79500.0|
|193893028|       79380.0|
| 16607835|       79102.5|
| 79666655|       77600.0|
|  4398387|       76800.0|
| 13144908|       76000.0|
| 52504748|       75000.0|
+---------+--------------+
only showing top 20 rows



One important thing is that you cannot mix Pandas UDF aggregate functions and ordinary aggregate functions (such as count, or avg). Try calling your grouped aggreate function and `count` in the same `agg` call:

## Effect of number of partitions

Let's see how number of DataFrame partitions effects execution time. Begin the cell below with Jupyter notebook's `%%timeit` "magic command" which will execute the cell a number of times and report the average execution time. Then set the `spark.sql.shuffle.partitions` parameter to the default of 200. Finally, paste in the last command you successfully executed (finding the top 20 average price ratings) and execute the cell.

In [87]:
%%timeit
spark.conf.set("spark.sql.shuffle.partitions", 200)
csvdf.groupBy('host_id').\
  agg(avgpricerating(csvdf.price, csvdf.review_scores_rating).alias('avgpricerating')).\
  orderBy(F.col('avgpricerating').desc()).show()

+---------+--------------+
|  host_id|avgpricerating|
+---------+--------------+
| 94719908|       99900.0|
|260778803|       99900.0|
|  6268597|       99900.0|
| 11152072|       90000.0|
| 55362111|       90000.0|
|119159862|       89600.0|
| 20644650|       88825.0|
| 23698637|       87500.0|
|  2525918|       85000.0|
| 29136151|       84000.0|
|  6035037|       81600.0|
| 57865143|       80000.0|
| 35652610|       80000.0|
| 11224822|       79500.0|
|193893028|       79380.0|
| 16607835|       79102.5|
| 79666655|       77600.0|
|  4398387|       76800.0|
| 13144908|       76000.0|
| 52504748|       75000.0|
+---------+--------------+
only showing top 20 rows

+---------+--------------+
|  host_id|avgpricerating|
+---------+--------------+
| 94719908|       99900.0|
|260778803|       99900.0|
|  6268597|       99900.0|
| 11152072|       90000.0|
| 55362111|       90000.0|
|119159862|       89600.0|
| 20644650|       88825.0|
| 23698637|       87500.0|
|  2525918|       85000.0|
| 

Now do the same but use only 4 partitions.

In [88]:
%%timeit
spark.conf.set("spark.sql.shuffle.partitions", 4)
csvdf.groupBy('host_id').\
  agg(avgpricerating(csvdf.price, csvdf.review_scores_rating).alias('avgpricerating')).\
  orderBy(F.col('avgpricerating').desc()).show()

+---------+--------------+
|  host_id|avgpricerating|
+---------+--------------+
|  6268597|       99900.0|
|260778803|       99900.0|
| 94719908|       99900.0|
| 55362111|       90000.0|
| 11152072|       90000.0|
|119159862|       89600.0|
| 20644650|       88825.0|
| 23698637|       87500.0|
|  2525918|       85000.0|
| 29136151|       84000.0|
|  6035037|       81600.0|
| 35652610|       80000.0|
| 57865143|       80000.0|
| 11224822|       79500.0|
|193893028|       79380.0|
| 16607835|       79102.5|
| 79666655|       77600.0|
|  4398387|       76800.0|
| 13144908|       76000.0|
| 52504748|       75000.0|
+---------+--------------+
only showing top 20 rows

+---------+--------------+
|  host_id|avgpricerating|
+---------+--------------+
|  6268597|       99900.0|
|260778803|       99900.0|
| 94719908|       99900.0|
| 55362111|       90000.0|
| 11152072|       90000.0|
|119159862|       89600.0|
| 20644650|       88825.0|
| 23698637|       87500.0|
|  2525918|       85000.0|
| 

So which one is faster? Can you explain why?

## Struct and map columns

Now let's construct a struct column by collecting all columns starting with "host_" into a single column called "host". You can use the `struct` function for that (when you create fields of that struct, remove the prefix "host_"). Remove the old columns from the resulting DataFrame. (Hint: it will be easier if you first collect hostnames and host columns into lists.)

In [8]:
hostcolnames = [x for x in filter(lambda cn: cn.startswith('host_'), csvdf.columns)]
hostcols = [csvdf[x].alias(x[5:]) for x in hostcolnames]

In [9]:
csvdf_str = csvdf.withColumn('host', F.struct(hostcols).alias('host')).drop(*hostcolnames)

Now let's create a map column in a similar way: collect all columns whose names start with "review_scores_" and put them in a map called "review_scores". Remove that prefix from the column name and use the rest as the key name (also drop the old columns).

In [10]:
rscolnames = [x for x in filter(lambda cn: cn.startswith('review_scores_'), csvdf_str.columns)]
rscols = [csvdf_str[x].alias(x[14:]) for x in rscolnames]
rsaliases = [x[14:] for x in rscolnames]

In [11]:
mapcols = []
for rsc in rscolnames:
    mapcols.append(F.lit(rsc[14:]))
    mapcols.append(csvdf_str[rsc])
csvdf_str = csvdf_str.withColumn('review_scores', F.create_map(*mapcols)).drop(*rscolnames)

To use struct fields try to find "the most talkative" hosts by counting the average number of interpunction signs in the description of their listings (just use `split` function on the `description` field using the `[!\?\.,]` regex as delimiter). Group by host id.

In [59]:
csvdf_str.select(csvdf_str['host.id'].alias('hostid'), F.size(F.split(csvdf_str.description, '[!\?\.,]')).alias('talkative')).\
  groupBy('hostid').avg('talkative').toDF('hostid', 'talkative').orderBy(F.col('talkative').desc()).show()

+---------+---------+
|   hostid|talkative|
+---------+---------+
| 32533816|     70.0|
|  9595076|     56.0|
|167351025|     46.0|
| 93875577|     44.0|
| 10958477|     44.0|
| 30263650|     44.0|
| 18046762|     43.0|
| 53323264|     43.0|
|  9195515|     43.0|
| 20773372|     43.0|
|  2923635|     43.0|
|212839622|     43.0|
| 14183444|     42.0|
|  2297280|     42.0|
|190048519|     42.0|
|  9993468|     41.0|
| 66704160|     41.0|
| 92843617|     41.0|
|  1552059|     41.0|
|115911746|     40.0|
+---------+---------+
only showing top 20 rows



Display the 10 first review score ratings.

In [79]:
csvdf.select(csvdf['review_scores']['rating']).show(10, False)

+---------------------+
|review_scores[rating]|
+---------------------+
|98                   |
|88                   |
|100                  |
|99                   |
|97                   |
|95                   |
|95                   |
|99                   |
|96                   |
|98                   |
+---------------------+
only showing top 10 rows



Struct and map columns are still not supported by Arrow (support is being tracked by the following JIRA ticket: https://jira.apache.org/jira/browse/SPARK-21187). This applies only if struct or map columns need to be serialized to Arrow. If you try to run the previous grouped map UDF (calculating the `mindiff` column) on this new DataFrame, you would get an error. 

You can try that here. Copy and modify the command for calculating `mindiff` but instead of selecting only `host.id`, and `price` columns, also select the new `review_scores` column. You should get an "UnsupportedOperationException" because `review_scores` is a map column (if you are selecting only `host.id`, that is just a string column, not a struct). 

In [43]:
csvdf_str.select('host.id', 'price', 'review_scores').groupBy('id').apply(calcmindiff).\
   orderBy(F.col('mindiff').desc()).show()

Py4JJavaError: An error occurred while calling o1260.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 24.0 failed 1 times, most recent failure: Lost task 0.0 in stage 24.0 (TID 1061, localhost, executor driver): java.lang.UnsupportedOperationException: Unsupported data type: map<string,string>
	at org.apache.spark.sql.execution.arrow.ArrowUtils$.toArrowType(ArrowUtils.scala:56)
	at org.apache.spark.sql.execution.arrow.ArrowUtils$.toArrowField(ArrowUtils.scala:92)
	at org.apache.spark.sql.execution.arrow.ArrowUtils$$anonfun$toArrowSchema$1.apply(ArrowUtils.scala:116)
	at org.apache.spark.sql.execution.arrow.ArrowUtils$$anonfun$toArrowSchema$1.apply(ArrowUtils.scala:115)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at org.apache.spark.sql.types.StructType.foreach(StructType.scala:99)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at org.apache.spark.sql.types.StructType.map(StructType.scala:99)
	at org.apache.spark.sql.execution.arrow.ArrowUtils$.toArrowSchema(ArrowUtils.scala:115)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.writeIteratorToStream(ArrowPythonRunner.scala:71)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:311)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1933)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:192)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1867)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1866)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1866)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2100)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2049)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2038)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
	at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1035)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.reduce(RDD.scala:1017)
	at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1.apply(RDD.scala:1439)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1426)
	at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.executeCollect(limit.scala:207)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3384)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2545)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2759)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:255)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:292)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsupportedOperationException: Unsupported data type: map<string,string>
	at org.apache.spark.sql.execution.arrow.ArrowUtils$.toArrowType(ArrowUtils.scala:56)
	at org.apache.spark.sql.execution.arrow.ArrowUtils$.toArrowField(ArrowUtils.scala:92)
	at org.apache.spark.sql.execution.arrow.ArrowUtils$$anonfun$toArrowSchema$1.apply(ArrowUtils.scala:116)
	at org.apache.spark.sql.execution.arrow.ArrowUtils$$anonfun$toArrowSchema$1.apply(ArrowUtils.scala:115)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at org.apache.spark.sql.types.StructType.foreach(StructType.scala:99)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at org.apache.spark.sql.types.StructType.map(StructType.scala:99)
	at org.apache.spark.sql.execution.arrow.ArrowUtils$.toArrowSchema(ArrowUtils.scala:115)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.writeIteratorToStream(ArrowPythonRunner.scala:71)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:311)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1933)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:192)


A workaround would be to first convert the required columns to JSON format. Try that now (use the `to_json` function on `review_scores` columns), but also create a new version of the `pandas_udf` function because the expected schema needs to be modified now.

In [58]:
newschema2 = "id string, price double, review_scores string, mindiff double"

@F.pandas_udf(newschema2, F.PandasUDFType.GROUPED_MAP)
def calcmindiff2(pdf):
    mn = pdf.price.min()
    rez = pdf.assign(mindiff=pd.Series(pdf.price - mn).values)
    return rez

csvdf_str.select('host.id', 'price', F.to_json('review_scores').alias('review_scores')).groupBy('id').apply(calcmindiff2).\
    orderBy(F.col('mindiff').desc()).show()

+---------+-----+--------------------+-------+
|       id|price|       review_scores|mindiff|
+---------+-----+--------------------+-------+
|178187873|999.0|{"rating":"94.0",...|  885.0|
|  3558013|999.0|{"rating":"100.0"...|  819.0|
|  3558013|999.0|{"rating":"96.0",...|  819.0|
|259132884|895.0|{"rating":null,"a...|  806.0|
| 14047320|900.0|{"rating":null,"a...|  720.0|
|  3357851|800.0|{"rating":"73.0",...|  720.0|
| 85420364|850.0|{"rating":"98.0",...|  710.0|
|131190497|781.0|{"rating":"100.0"...|  678.0|
| 14874061|750.0|{"rating":null,"a...|  670.0|
| 39108038|795.0|{"rating":"97.0",...|  645.0|
|  1826139|700.0|{"rating":"97.0",...|  561.0|
| 76104209|850.0|{"rating":"60.0",...|  550.0|
|131423393|700.0|{"rating":"92.0",...|  550.0|
|131423393|699.0|{"rating":"94.0",...|  549.0|
| 76104209|800.0|{"rating":null,"a...|  500.0|
| 11601106|795.0|{"rating":"90.0",...|  500.0|
| 22879651|545.0|{"rating":null,"a...|  493.0|
|   329249|650.0|{"rating":"85.0",...|  490.0|
|   329249|65

Pandas has `read_json` function to read these values, but we will not go into that. Needless to say, this would introduce significant processing overhead so the current situation is far from ideal.

## Debugging DataFrames

DataFrames have a method called `explain` which you can use to find more information about how the DataFrame will be calculated. Execute it on the last DataFrame you created (the one with `review_scores` map).

In [60]:
csvdf_str.explain()

== Physical Plan ==
Project [id#10, listing_url#11, scrape_id#12, last_scraped#13, name#14, summary#15, space#16, description#17, experiences_offered#18, neighborhood_overview#19, notes#20, transit#21, access#22, interaction#23, house_rules#24, thumbnail_url#25, medium_url#26, picture_url#27, xl_picture_url#28, street#47, neighbourhood#48, neighbourhood_cleansed#49, neighbourhood_group_cleansed#50, city#51, ... 59 more fields]
+- FileScan csv [id#10,listing_url#11,scrape_id#12,last_scraped#13,name#14,summary#15,space#16,description#17,experiences_offered#18,neighborhood_overview#19,notes#20,transit#21,access#22,interaction#23,house_rules#24,thumbnail_url#25,medium_url#26,picture_url#27,xl_picture_url#28,host_id#29,host_url#30,host_name#31,host_since#32,host_location#33,... 82 more fields] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/spark/notebooks/airbnb/listings-corr.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,listing_url:string,

This shows you the physical plan. That might be helpful, but more helpful is the extended information. Call the `explain` function again, but now with the `extended` argument set to `True`.

In [78]:
csvdf_str.explain(True)

== Parsed Logical Plan ==
Project [id#2712, listing_url#2713, scrape_id#2714, last_scraped#2715, name#2716, summary#2717, space#2718, description#2719, experiences_offered#2720, neighborhood_overview#2721, notes#2722, transit#2723, access#2724, interaction#2725, house_rules#2726, thumbnail_url#2727, medium_url#2728, picture_url#2729, xl_picture_url#2730, street#2749, neighbourhood#2750, neighbourhood_cleansed#2751, neighbourhood_group_cleansed#2752, city#2753, ... 59 more fields]
+- Project [id#2712, listing_url#2713, scrape_id#2714, last_scraped#2715, name#2716, summary#2717, space#2718, description#2719, experiences_offered#2720, neighborhood_overview#2721, notes#2722, transit#2723, access#2724, interaction#2725, house_rules#2726, thumbnail_url#2727, medium_url#2728, picture_url#2729, xl_picture_url#2730, street#2749, neighbourhood#2750, neighbourhood_cleansed#2751, neighbourhood_group_cleansed#2752, city#2753, ... 66 more fields]
   +- Project [id#2712, listing_url#2713, scrape_id#2

Go to Web UI SQL tab here: http://192.168.10.2:4040/SQL and click on the the last executed job. In the "Details" section you should see the same information as above. 

You can notice a large number of "Project" dependencies in the logical plan. Sometimes, if your DataFrame has many columns and there are many such projections, this can significantly slow down Spark planning (which happens in the driver). To get rid of that problem, you can try and minimize the number of projections.

Do that now with your DataFrame. Try to come up with an equivalent DataFrame but using the minimum number of projections.

In [82]:
csvdf = spark.read.csv('/home/spark/notebooks/airbnb/listings-corr.csv', header=True, quote="~", escape="{", ignoreLeadingWhiteSpace=True)

hostcolnames = [x for x in filter(lambda cn: cn.startswith('host_'), csvdf.columns)]
hostcols = [csvdf[x].alias(x[5:]) for x in hostcolnames]
rscolnames = [x for x in filter(lambda cn: cn.startswith('review_scores_'), csvdf_str.columns)]
rscols = [csvdf_str[x].alias(x[14:]) for x in rscolnames]
rsaliases = [x[14:] for x in rscolnames]
mapcols = []
for rsc in rscolnames:
    mapcols.append(F.lit(rsc[14:]))
    mapcols.append(csvdf[rsc])

othercols = csvdf.columns.copy()
othercols.remove('price')
othercols.remove('amenities')
for c in hostcolnames:
    othercols.remove(c)
for c in rscolnames:
    othercols.remove(c)

csvdfnew = csvdf.select(F.substring_index(csvdf.price, '$', -1).cast('double').alias('price'),
  F.split(csvdf.amenities, ',').alias('amenities'),
  F.struct(hostcols).alias('host'),
  F.create_map(*mapcols).alias('review_scores'), *othercols)

csvdfnew.explain(True)

== Parsed Logical Plan ==
'Project [cast(substring_index(price#7549, $, -1) as double) AS price#7719, split(amenities#7547, ,) AS amenities#7720, named_struct(id, host_id#7508 AS id#7701, url, host_url#7509 AS url#7702, name, host_name#7510 AS name#7703, since, host_since#7511 AS since#7704, location, host_location#7512 AS location#7705, about, host_about#7513 AS about#7706, response_time, host_response_time#7514 AS response_time#7707, response_rate, host_response_rate#7515 AS response_rate#7708, acceptance_rate, host_acceptance_rate#7516 AS acceptance_rate#7709, is_superhost, host_is_superhost#7517 AS is_superhost#7710, thumbnail_url, host_thumbnail_url#7518 AS thumbnail_url#7711, picture_url, host_picture_url#7519 AS picture_url#7712, ... 12 more fields) AS host#7721, map() AS review_scores#7722, unresolvedalias('id, None), unresolvedalias('listing_url, None), unresolvedalias('scrape_id, None), unresolvedalias('last_scraped, None), unresolvedalias('name, None), unresolvedalias('summa

Your logical plan should be much simpler now.

## Partitioning & bucketing

Parquet files can be partitioned into several "folders". Data can also be "bucketed" into several files based on a bucketing column (or columns). Hashes of these values will be used to create bucket indexes. Furthermore, data within buckets can be sorted by one or more columns.

Let's try this out with your DataFrame. Partition it by the "state" column, bucket by "id" into 10 buckets and sort withing buckets by "longitude". Then save the result under some name and look at the results using a command shell.

In [21]:
csvdf_str.write.partitionBy('state').bucketBy(10, 'id').sortBy('longitude').saveAsTable('buckettest', mode='overwrite')

Now let's create three bucketed versions of the same data. Do not partition the data but only use bucketing. Save the first version as bucketed into 10 buckets by the "id" column, under name "bucketed1". For the second version use 20 buckets and name "bucketed2". Save the third version again with 10 buckets and name "bucketed3".

In [22]:
csvdf_str.write.bucketBy(10, 'id').saveAsTable('bucketed1')
csvdf_str.write.bucketBy(20, 'id').saveAsTable('bucketed2')
csvdf_str.write.bucketBy(10, 'id').saveAsTable('bucketed3')
csvdf_str.write.bucketBy(10, 'state').saveAsTable('bucketed4')

Next, execute the following line to turn off automatic broadcasting of small tables:

In [None]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "0")

Now, let's test the difference between joining "bucketed1" with "bucketed2" and with "bucketed3". First, join the first table with the second one, based on the "id" column. You can load the tables with `spark.table` and just call `count` to trigger the join.

In [27]:
spark.table('bucketed1').join(spark.table('bucketed2'), 'id').count()

20337

Examine the execution plan using the Web UI.

Now join the first table with the third and again examine the Web UI.

In [28]:
spark.table('bucketed1').join(spark.table('bucketed3'), 'id').count()

20337

Do you notice the difference? You should see an extra "Exchange" step inserted in the first join. That step can have dramatic consequences if the tables are large.