# Pyspark Fu

## 1. Initialising the Spark Session

In [1]:
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

CONF = {
    'spark.ui.showConsoleProgress':       'false',
    'spark.ui.dagGraph.retainedRootRDDs': '1',
    'spark.ui.retainedJobs':              '1',
    'spark.ui.retainedStages':            '1',
    'spark.ui.retainedTasks':             '1',
    'spark.sql.ui.retainedExecutions':    '1',
    'spark.worker.ui.retainedExecutors':  '1',
    'spark.worker.ui.retainedDrivers':    '1',
    'spark.executor.instances':           '1',
}

def spark_session() -> SparkSession:
    '''
    - set a bunch of spark config variables that help lighten the load
    - local[1] locks the spark runtime to a single core
    - silence noisy warning logs
    '''
    conf = SparkConf().setAll([(k,v) for k,v in CONF.items()])

    sc = SparkSession.builder.master('local[1]').config(conf=conf).getOrCreate()
    sc.sparkContext.setLogLevel('ERROR')
    return sc

In [2]:
spark = spark_session()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/30 08:00:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## 2. Create a simple dataframe for debugging


- The pyspark official docs don't often "create" the dataframe that the code examples refer to

In [3]:
df = spark.createDataFrame([
    {'a': 'b', 'n': {'a': 'b'}},
    {'a': 'c', 'n': {'z': 'x', 'y': 'b'}},
    {'a': 'd', 'n': {'o': None, 't': 'a', '2': 3}}
])

df.show(truncate=False)

+---+---------------------------+
|a  |n                          |
+---+---------------------------+
|b  |{a -> b}                   |
|c  |{y -> b, z -> x}           |
|d  |{2 -> 3, t -> a, o -> null}|
+---+---------------------------+



## 3. Joins

### 3.1. Avoid duplicate column names

In [4]:
# Let's construct two dataframes that share a column to join on

df1 = spark.createDataFrame([
    {'id': '123', 'name': 'pikachu'},
    {'id': '999', 'name': 'evee'},
    {'id': '007', 'name': 'charizard'},
])
df2 = spark.createDataFrame([
    {'id': '123', 'name': 'ash'},
    {'id': '999', 'name': 'chloe'},
    {'id': '007', 'name': 'ash'},
])

df1.show(), df2.show()

+---+---------+
| id|     name|
+---+---------+
|123|  pikachu|
|999|     evee|
|007|charizard|
+---+---------+

+---+-----+
| id| name|
+---+-----+
|123|  ash|
|999|chloe|
|007|  ash|
+---+-----+



(None, None)

In [5]:
# Now, lets join them together into a combined pokemon-and-trainer table
joined = df1.join(
    df2,
    on=df1['id'] == df2['id'],
    how='inner',
)
joined.show()

+---+---------+---+-----+
| id|     name| id| name|
+---+---------+---+-----+
|007|charizard|007|  ash|
|123|  pikachu|123|  ash|
|999|     evee|999|chloe|
+---+---------+---+-----+



This _seems_ fine initially, but spark blows up as soon as you try and use the 'id' column in an expression

This example will produce the error:

`[AMBIGUOUS_REFERENCE] Reference `id` is ambiguous, could be: [`id`, `id`].`

This can be particularly annoying as the error will only appear when you attempt to use the columns, but will go undetected if this doesn't happen

In [6]:
import pyspark.sql.utils
from pyspark.sql import DataFrame
from typing import List

def try_select(df: DataFrame, cols: List[str]):
    try:
        df.select(*cols).show()

    except pyspark.sql.utils.AnalysisException as e:
        print('select failed!', e)

In [7]:
try_select(joined, ['id', 'name', 'trainer'])

select failed! [AMBIGUOUS_REFERENCE] Reference `id` is ambiguous, could be: [`id`, `id`].


The solution: use a different parameter for the `on` columns

### 3.1.2 Join using list of names

In [8]:
joined = df1.join(
    df2,
    on=['id'],
    how='inner',
)
joined.show()

# Now let's try that same select again
try_select(joined, ['id', 'name', 'trainer'])

+---+---------+-----+
| id|     name| name|
+---+---------+-----+
|007|charizard|  ash|
|123|  pikachu|  ash|
|999|     evee|chloe|
+---+---------+-----+

select failed! [AMBIGUOUS_REFERENCE] Reference `name` is ambiguous, could be: [`name`, `name`].


### 3.1.3 Dataframe aliasing is a bit weird

In [9]:
df1.alias('pokemon').select('*').show()

+---+---------+
| id|     name|
+---+---------+
|123|  pikachu|
|999|     evee|
|007|charizard|
+---+---------+



In [10]:
import pyspark.sql.functions as F

joined = df1.alias('pokemon').join(
    df2.alias('trainers'),
    on=F.col('pokemon.id') == F.col('trainers.id'),
    how='inner',
)
joined.show()
joined.columns

+---+---------+---+-----+
| id|     name| id| name|
+---+---------+---+-----+
|007|charizard|007|  ash|
|123|  pikachu|123|  ash|
|999|     evee|999|chloe|
+---+---------+---+-----+



['id', 'name', 'id', 'name']

Now, our error message is much better, as it contains the dataframe aliases identifying which table the duplicate column name is from

In [11]:
try_select(joined, ['id'])

select failed! [AMBIGUOUS_REFERENCE] Reference `id` is ambiguous, could be: [`pokemon`.`id`, `trainers`.`id`].


Confusingly, using `Dataframe.columns` does not show the aliases, but they are usable when selecting

In [12]:
print(joined.columns)

try_select(joined, ['pokemon.id'])

['id', 'name', 'id', 'name']
+---+
| id|
+---+
|007|
|123|
|999|
+---+



## 4. Default empty DataFrames

Sometimes it's handy to be able to instantiate an "empty" dataframe in the case that a file/some source data is missing

In [13]:
# This will result in an AnalysisException complaining that 
# the file did not exist
from pyspark.errors.exceptions.captured import AnalysisException

try:
    spark.read.json('optional_source.json')
except AnalysisException as e:
    print(e)

[PATH_NOT_FOUND] Path does not exist: file:/Users/tomm/dev/tmck-code.github.io/articles/20230605_pyspark_fu/optional_source.json.


We can mitigate this by catching the exception, and creating a dataframe that matches the schema, but has 0 rows.

This ensures that any queries on the dataframe will still work, as all the columns will exist with the correct type.

_**This requires that we know the schema of the optional file**_


The easiest way to create a schema is usually to create a single-line file containing a valid line that matches the expected schema. Then, read that file into a dataframe and capture the schema for re-use (read: copy/paste)

In [14]:
import json

with open('not_there.json', 'w') as ostream:
    ostream.write(json.dumps({
        'id': 123, 'key': 'yolo', 'attrs': {'a': 'b'}
    }))

spark.read.json('not_there.json').schema.json()

'{"fields":[{"metadata":{},"name":"attrs","nullable":true,"type":{"fields":[{"metadata":{},"name":"a","nullable":true,"type":"string"}],"type":"struct"}},{"metadata":{},"name":"id","nullable":true,"type":"long"},{"metadata":{},"name":"key","nullable":true,"type":"string"}],"type":"struct"}'

I've never found a way (using StringIO or similar) to achieve this without writing a file - if you find a way then let me know!

Let's bundle this up into a method that tidies up after itself:

In [15]:
import json
import os

def guess_schema(row: dict, tmp_fpath: str = 'tmp.json') -> dict:
    with open(tmp_fpath, 'w') as ostream:
        ostream.write(json.dumps({
            'id': 123, 'key': 'yolo', 'attrs': {'a': 'b'}
        }))    
    schema = json.loads(spark.read.json('not_there.json').schema.json())
    os.remove(tmp_fpath)

    return schema

In [16]:
schema = guess_schema(
    {'id': 123, 'key': 'yolo', 'attrs': {'a': 'b'}}
)
print(json.dumps(schema, indent=2))

{
  "fields": [
    {
      "metadata": {},
      "name": "attrs",
      "nullable": true,
      "type": {
        "fields": [
          {
            "metadata": {},
            "name": "a",
            "nullable": true,
            "type": "string"
          }
        ],
        "type": "struct"
      }
    },
    {
      "metadata": {},
      "name": "id",
      "nullable": true,
      "type": "long"
    },
    {
      "metadata": {},
      "name": "key",
      "nullable": true,
      "type": "string"
    }
  ],
  "type": "struct"
}


As you can see from this quick demo, it isn't quick to craft pyspark schemas from hand! In my experience it's prone to much human error and frustrating debugging, especially as schemas can grow large very quickly!

Now, we can tie this into the method to safely load/create a dataframe

In [17]:
from pyspark.errors.exceptions.captured import AnalysisException
import pyspark.sql.types as T

def safe_load(fpath: str, schema: dict):
    try:
        return spark.read.json(fpath)
    except AnalysisException as e:
        print(e)
        return spark.createDataFrame([], schema=T.StructType.fromJson(schema))

> Side note: the method to convert a dict to a StructType (schema) is confusingly named `fromJson` despite the fact that the method accepts a dict, not a JSON string

In [18]:
df = safe_load('not_there.json', schema)

In [19]:
df.show()

+-----+---+----+
|attrs| id| key|
+-----+---+----+
|  {b}|123|yolo|
+-----+---+----+



After the initial generation, the schema can be stored in a file and loaded or just defined directly in the code, rather than "guessed" every time

## Converting StructType -> JSON -> StructType

In [20]:
df = spark.createDataFrame([
    {'a': 'b', 'n': {'a': 'b'}},
    {'a': 'c', 'n': {'z': 'x', 'y': 'b'}},
    {'a': 'd', 'n': {'o': None, 't': 'a', '2': 3}}
])

df.show(truncate=False)

print(df.schema.json())

+---+---------------------------+
|a  |n                          |
+---+---------------------------+
|b  |{a -> b}                   |
|c  |{y -> b, z -> x}           |
|d  |{2 -> 3, t -> a, o -> null}|
+---+---------------------------+

{"fields":[{"metadata":{},"name":"a","nullable":true,"type":"string"},{"metadata":{},"name":"n","nullable":true,"type":{"keyType":"string","type":"map","valueContainsNull":true,"valueType":"string"}}],"type":"struct"}


In [21]:
from dataclasses import dataclass
import json

from pyspark.sql import DataFrame

@dataclass
class Schema:
    df: DataFrame
    def as_json(self): return self.df.schema.json()
    def as_dict(self): return json.loads(self.as_json())
    def show(self):    print(json.dumps(self.as_dict(), indent=2))

In [22]:
Schema(df).show()

{
  "fields": [
    {
      "metadata": {},
      "name": "a",
      "nullable": true,
      "type": "string"
    },
    {
      "metadata": {},
      "name": "n",
      "nullable": true,
      "type": {
        "keyType": "string",
        "type": "map",
        "valueContainsNull": true,
        "valueType": "string"
      }
    }
  ],
  "type": "struct"
}


## Generating JSON output with dynamic keys

To demonstrate the problem, we will
1. read in a JSON file matching the dataframe above, with a few different nested types (e.g. strings, numbers, and null)
2. remove any key/values pairs with null values
3. write a JSON file where all nested types match the input

In [23]:
example_df = spark.createDataFrame([
    {'a': 'b', 'n': {'a': 'b'}},
    {'a': 'c', 'n': {'z': 'x', 'y': 'b'}},
    {'a': 'd', 'n': {'o': None, 't': 'a', '2': 3}}
])

df.show(truncate=False)

+---+---------------------------+
|a  |n                          |
+---+---------------------------+
|b  |{a -> b}                   |
|c  |{y -> b, z -> x}           |
|d  |{2 -> 3, t -> a, o -> null}|
+---+---------------------------+



In [24]:
data = [
    {'a': 'b', 'n': {'a': True, 'z': ['1', 7, True]}},
    {'a': 'c', 'n': {'a': 'b', 'z': 'x', 't': None}},
    {'a': 'd', 'n': {'a': 3, 'z': None}},
]
print('\n'.join(json.dumps(r) for r in data), file=open('tmp.json', 'w'))

In [25]:
cat tmp.json

{"a": "b", "n": {"a": true, "z": ["1", 7, true]}}
{"a": "c", "n": {"a": "b", "z": "x", "t": null}}
{"a": "d", "n": {"a": 3, "z": null}}


Now, to read the file with spark

In [26]:
df = spark.read.json('tmp.json')
df.show(truncate=False)

+---+--------------------------+
|a  |n                         |
+---+--------------------------+
|b  |{true, null, ["1",7,true]}|
|c  |{b, null, x}              |
|d  |{3, null, null}           |
+---+--------------------------+



This looks a little weird! This is due to the fact that spark.createDataFrame and spark.read.json can be given an identical input table and infer different schemas

In [27]:
Schema(df).show()

{
  "fields": [
    {
      "metadata": {},
      "name": "a",
      "nullable": true,
      "type": "string"
    },
    {
      "metadata": {},
      "name": "n",
      "nullable": true,
      "type": {
        "fields": [
          {
            "metadata": {},
            "name": "a",
            "nullable": true,
            "type": "string"
          },
          {
            "metadata": {},
            "name": "t",
            "nullable": true,
            "type": "string"
          },
          {
            "metadata": {},
            "name": "z",
            "nullable": true,
            "type": "string"
          }
        ],
        "type": "struct"
      }
    }
  ],
  "type": "struct"
}


You might notice that spark has inferred all 

In [28]:
df = spark.read.json('tmp.json', schema=example_df.schema)
df.show(truncate=False)

+---+------------------------------+
|a  |n                             |
+---+------------------------------+
|b  |{a -> true, z -> ["1",7,true]}|
|c  |{a -> b, z -> x, t -> null}   |
|d  |{a -> 3, z -> null}           |
+---+------------------------------+



In [29]:
import pyspark.sql.functions as F

df_filtered = df.select(
    F.col('a'),
    F.map_filter(F.col('n'), lambda k,v: v.isNotNull()).alias('n')
)
df_filtered.show(truncate=False)

+---+------------------------------+
|a  |n                             |
+---+------------------------------+
|b  |{a -> true, z -> ["1",7,true]}|
|c  |{a -> b, z -> x}              |
|d  |{a -> 3}                      |
+---+------------------------------+



The performance impact of using a python UDF can be mitigated by using a pure SQL statement

In [30]:
df.createOrReplaceTempView("df")
df_filtered = spark.sql(
    "select a, map_filter(n, (k,v) -> v is not null) custom from df"
)
df_filtered.show(truncate=False)

+---+------------------------------+
|a  |custom                        |
+---+------------------------------+
|b  |{a -> true, z -> ["1",7,true]}|
|c  |{a -> b, z -> x}              |
|d  |{a -> 3}                      |
+---+------------------------------+



In [31]:
df_filtered.write.format('json').mode('overwrite').save('tmp_out.json')

In [32]:
cat tmp_out.json/part*

{"a":"b","custom":{"a":"true","z":"[\"1\",7,true]"}}
{"a":"c","custom":{"a":"b","z":"x"}}
{"a":"d","custom":{"a":"3"}}


As we can see here, this is very close! All of the types have been _somewhat_ preserved:

- All strings are still strings
- All other types are JSON-encoded strings