# Convert Spark Talbes and Functions to sql-language-server Schema File

This notebooks creates a simple table with nested columns. It then exports the table and spark functions to a json file which sql-language-server can use to code-complete SQL statement for spark sql.

This notebook serves as an illustration of how to do this conversion. This code will eventually be packaged into a pip install so that it can be reused more easily.

This code could also be leveraged by the %%sparksql (sparksql-magic) which would make it transparent to the user. Since generation of the schema takes a few seconds, %%sparksql could detect how up-to-date the schema file is and re-generated it only when it passes a certain threshold (say 15min).

In [19]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [None]:
%load_ext sparksql

In [21]:
%%sparksql
DROP TABLE IF EXISTS student

In [23]:
%%sparksql
CREATE TABLE student (
    id INT, 
    name STRING,
    `nestedwithspaces` STRUCT<`sub field`:STRUCT<`sub field2`:STRING>>,
    age INT,
    books ARRAY<STRUCT<`title`:STRING, `chapters`:ARRAY<STRUCT<`paragraph`:STRING>>>>,
    struct_col STRUCT<`address`:STRUCT<`streetName`:STRING, `streetNumber`:BIGINT>>,
    map_col MAP<STRING, MAP<STRING, STRUCT<`start`:BIGINT,`end`:BIGINT>>>

    ) USING PARQUET

In [24]:
%%sparksql
/*
    We should have a student table in our database.    
*/
SHOW TABLES

0,1,2
database,tableName,isTemporary
default,student,False


In [25]:
%%sparksql
/*
    We can see the column nestedwithspaces contains sub-fields with spaces.
    Also books is an array of structs and map_col is of MapType.
*/
DESCRIBE TABLE student

0,1,2
col_name,data_type,comment
id,int,
name,string,
nestedwithspaces,struct<sub field:struct<sub field2:string>>,
age,int,
books,"array<struct<title:string,chapters:array<struct<paragraph:string>>>>",
struct_col,"struct<address:struct<streetName:string,streetNumber:bigint>>",
map_col,"map<string,map<string,struct<start:bigint,end:bigint>>>",


# Generate Schema File for sql-language-server
Normally the spark catalog would point to a data warehouse full of schemas and tables. In this demo we only have a single student table which we just created.

In [26]:
import findspark
findspark.init()
import json
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder.getOrCreate()

In [27]:
def getTypeName(t):
    if type(t) == LongType:
        return 'long'
    if type(t) == IntegerType:
        return 'integer'
    if type(t) == StringType:
        return 'string'
    if type(t) == ArrayType:
        return 'array'
    if type(t) == StructType:
        return 'struct'
    if type(t) == MapType:
        return 'map'

def getPath(path, name):
    if ' ' in name:
        name = '`' + name + '`'
    if len(path) > 0:
        return f'{path}.{name}'
    return name

def getChildren(field, path, fields):
    if type(field) == StructField:
        getChildren(field.dataType, getPath(path, field.name), fields)
    elif type(field) == MapType:
        getChildren(field.valueType, getPath(path, 'key'), fields)
    elif type(field) == ArrayType:
        getChildren(field.elementType, path, fields)
    elif type(field) == StructType:
        for name in field.fieldNames():
            child = field[name]
            fields.append({
                'columnName': getPath(path, name),
                'metadata': child.metadata, 
                'type': getTypeName(child.dataType),
                'description': getTypeName(child.dataType)
            })
            getChildren(child, path, fields)

def getColumns(name):
    fields = []
    getChildren(spark.table(name).schema, '', fields)
    return fields

def getTables(database):
    rows = spark.sql(f'SHOW TABLES IN {database}').collect()
    return list(map(lambda r: {
        "tableName": r.tableName,
        "columns": getColumns(r.tableName), 
        "database": None
    }, rows))

def getDescription(name):
    rows = spark.sql(f'DESCRIBE FUNCTION EXTENDED {name}').collect()
    textLines = list(map(lambda r: r.function_desc, rows))
    return "\n".join(textLines)

def getFunctions():
    rows = spark.sql('SHOW FUNCTIONS').collect()
    return list(map(lambda f: {
        "name": f.function, 
        "description": getDescription(f.function)
    }, rows))

def getSparkDatabaseSchema():
    return {
        "tables": getTables('default'), 
        "functions": getFunctions()
    }

sparkdb_schema = getSparkDatabaseSchema()

In [None]:
# Save schema to disk. sql-language-server will pickup any changes to this file.
outputFileName = '/tmp/sparkdb.schema.json'
with open(outputFileName, 'w') as fout:
    json.dump(sparkdb_schema, fout, sort_keys=True, indent=2)