> Create a virtual Env.
> Example:
>
> 1. create a simple new env
>
>    ```
>    pyenv virtualenv 3.10.6 .envPyspark
>    pyenv activate .envPyspark
>    ```
>
> 2. install the requirements
>    ```
>    pip install --upgrade pip 
>    pip install ipykernel pyspark==3.4.0
>    ```
>
> 3. Set as the kernel for your notebook
> 4. Tested with Java 8
>.

# Setup

## Imports

In [1]:
import os
from typing import List

from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame

## Constants

In [2]:
MAVEN_DEPENDENCIES: List[str] = [
    'io.delta:delta-core_2.12:2.4.0'
]

CURRENT_DIR: str = os.getcwd()
PATH_TMP: str = f'{CURRENT_DIR}/tmp'
TMP_DF_NAME: str = 'df_wiseupdata'
TMP_FULL_FILE_NAME: str = f"{PATH_TMP}/{TMP_DF_NAME}"


## Create the spark session

In [3]:
spark: SparkSession = (
    SparkSession.builder.config('spark.jars.packages', ','.join(MAVEN_DEPENDENCIES))
    .config('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension')
    .config('spark.sql.catalog.spark_catalog','org.apache.spark.sql.delta.catalog.DeltaCatalog')
    .appName('edf_core')
    .getOrCreate()
)

your 131072x1 screen size is bogus. expect trouble
23/12/12 21:40:37 WARN Utils: Your hostname, DESKTOP-O03M3NM resolves to a loopback address: 127.0.1.1; using 172.26.72.227 instead (on interface eth0)
23/12/12 21:40:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/wiseupdata/.pyenv/versions/3.10.6/envs/.envPyspark/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/wiseupdata/.ivy2/cache
The jars for the packages stored in: /home/wiseupdata/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-fe9aac11-279a-454c-91a9-de22f180ec38;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.4.0 in central
	found io.delta#delta-storage;2.4.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 131ms :: artifacts dl 5ms
	:: modules in use:
	io.delta#delta-core_2.12;2.4.0 from central in [default]
	io.delta#delta-storage;2.4.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |

## Creates the main DataFrame

In [4]:
df: DataFrame = (
    spark.createDataFrame([{'id': 1, 'name': 'Josh', 'age': 10, 'courses':['Python', 'Java']}, {'id': 1, 'name':'Maria', 'courses':['html']}])
)

## Basic visualization

In [5]:
df.show()

                                                                                

+----+--------------+---+-----+
| age|       courses| id| name|
+----+--------------+---+-----+
|  10|[Python, Java]|  1| Josh|
|null|        [html]|  1|Maria|
+----+--------------+---+-----+



## Save the the data temporary

In [6]:
(
    df
    .write
    .mode('overwrite')
    .option('mergeSchema', True)
    .format('parquet')
    .save(TMP_FULL_FILE_NAME)
)

                                                                                

# Schema Examples

## Infer Schema

In [7]:
df: DataFrame = (
    spark
    .read
    .format('parquet')
    .load(TMP_FULL_FILE_NAME)
)

In [8]:
df.show()

+----+--------------+---+-----+
| age|       courses| id| name|
+----+--------------+---+-----+
|  10|[Python, Java]|  1| Josh|
|null|        [html]|  1|Maria|
+----+--------------+---+-----+



In [9]:
df.schema

StructType([StructField('age', LongType(), True), StructField('courses', ArrayType(StringType(), True), True), StructField('id', LongType(), True), StructField('name', StringType(), True)])

Spark inferred the schema for the **age** as Long

## Fixed schema option 01

In [17]:
from pyspark.sql.types import StructType, StructField, ArrayType, StringType, LongType, IntegerType

In [18]:
df_schema_01 = StructType([StructField('age', IntegerType(), True), StructField('courses', ArrayType(StringType(), True), True), StructField('id', LongType(), True), StructField('name', StringType(), True)])

Spark was force the schema for the **age** as Integer

In [19]:
df_with_fixed_schema_01: DataFrame = (
    spark
    .read
    .format('parquet')
    .schema(df_schema_01)
    .load(TMP_FULL_FILE_NAME)
)

In [20]:
df_with_fixed_schema_01.schema

StructType([StructField('age', IntegerType(), True), StructField('courses', ArrayType(StringType(), True), True), StructField('id', LongType(), True), StructField('name', StringType(), True)])

## Fixed schema option 02

In [53]:
df_schema_02 = "age long, courses array<string>, id long, name string"

In [54]:
df_with_fixed_schema_02: DataFrame = (
    spark
    .read
    .format('parquet')
    .schema(df_schema_02)
    .load(TMP_FULL_FILE_NAME)
)

In [55]:
df_with_fixed_schema_02.show()

+----+--------------+---+-----+
| age|       courses| id| name|
+----+--------------+---+-----+
|  10|[Python, Java]|  1| Josh|
|null|        [html]|  1|Maria|
+----+--------------+---+-----+



## Option 2 does not permit a direct conversion from Long to Int

In [59]:
df_schema_03 = "age int, courses array<string>, id long, name string"

df_with_fixed_schema_03: DataFrame = (
    spark
    .read
    .format('parquet')
    .schema(df_schema_03)
    .load(TMP_FULL_FILE_NAME)
)


try:

    df_with_fixed_schema_03.show()
except Exception as e:
    print(e)

23/12/12 21:55:47 ERROR Executor: Exception in task 0.0 in stage 22.0 (TID 38)
org.apache.spark.SparkException: Parquet column cannot be converted in file file:///home/wiseupdata/public/pyspark/examples/pyspark/tmp/df_wiseupdata/part-00003-1c7c46e7-ebc6-47aa-aa5b-327f3cbcdd4b-c000.snappy.parquet. Column: [age], Expected: int, Found: INT64.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.unsupportedSchemaColumnConvertError(QueryExecutionErrors.scala:868)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:298)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:594)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.pr

An error occurred while calling o266.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 22.0 failed 1 times, most recent failure: Lost task 0.0 in stage 22.0 (TID 38) (172.26.72.227 executor driver): org.apache.spark.SparkException: Parquet column cannot be converted in file file:///home/wiseupdata/public/pyspark/examples/pyspark/tmp/df_wiseupdata/part-00003-1c7c46e7-ebc6-47aa-aa5b-327f3cbcdd4b-c000.snappy.parquet. Column: [age], Expected: int, Found: INT64.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.unsupportedSchemaColumnConvertError(QueryExecutionErrors.scala:868)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:298)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:594)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Generat

# References

1. https://spark.apache.org/docs/3.4.0/api/python/reference/index.html