### Lesson #1: There is no inherent Spark version in Hadoop YARN

In [None]:
import sys
!{sys.executable} -m pip install pyspark==3.2.1

In [None]:
from pyspark.sql import SparkSession
spark = (
    SparkSession.builder
    .master('yarn')
    .getOrCreate()
)
spark

In [None]:
import sys
!{sys.executable} -m pip install pyspark==3.0.0

### Lesson #2: Python UDFs require Python interpreters, duh!

In [None]:
from pyspark.sql.functions import udf
def get_python_version():
    import sys
    return f'{sys.executable} v{sys.version}'
py_udf = udf(get_python_version)
spark.range(3).withColumn("newCol", py_udf()).show(truncate=False)

In [None]:
import os
os.environ['PYSPARK_PYTHON'] = 'python3'
from pyspark.sql import SparkSession
spark = (
    SparkSession.builder
    .master('yarn')
    .getOrCreate()
)
spark

### Lesson #3: It’s Bring Your Own Python to Cluster Day!

In [None]:
!/opt/conda/bin/conda install -y conda-pack
!/opt/conda/bin/conda create -y python=3.8.13 --name=conda-env
!/opt/conda/bin/conda pack --prefix /opt/conda/envs/conda-env -f -o ./conda-env.tgz

In [None]:
import os
os.environ['PYSPARK_PYTHON'] = './conda-env/bin/python3.8'
from pyspark.sql import SparkSession
spark = (
    SparkSession.builder
    .master('yarn')
    .config('spark.yarn.dist.archives','./conda-env.tgz#conda-env')
    .getOrCreate()
)
spark

In [None]:
from pyspark.sql.functions import udf
def get_python_version():
    import sys
    return f'{sys.executable} v{sys.version}'
py_udf = udf(get_python_version)
spark.range(3).withColumn("newCol", py_udf()).show(truncate=False)

### Lesson #4: PyPI packages hitch a ride when distributing conda environments

In [None]:
!/opt/conda/envs/conda-env/bin/pip install pandas==1.4.2
!/opt/conda/bin/conda pack --prefix /opt/conda/envs/conda-env -f -o ./conda-env.tgz

In [None]:
import os
os.environ['PYSPARK_PYTHON'] = './conda-env/bin/python3.8'
from pyspark.sql import SparkSession
spark = (
    SparkSession.builder
    .master('yarn')
    .config('spark.yarn.dist.archives','./conda-env.tgz#conda-env')
    .getOrCreate()
)
spark

In [None]:
from pyspark.sql.functions import udf
def get_pandas_version():
    import pandas
    return f'pandas v{pandas.__version__}'
py_udf = udf(get_pandas_version)
spark.range(3).withColumn("newCol", py_udf()).show(truncate=False)

### Lesson #5: New data source? There’s a Jar for that!

In [None]:
!curl https://storage.googleapis.com/spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.24.2.jar -o bq.jar

In [None]:
from pyspark.sql import SparkSession
spark = (
    SparkSession.builder
    .master('yarn')
    .config('spark.yarn.dist.jars','./bq.jar')
    .config('spark.driver.extraClassPath','./bq.jar')
    .getOrCreate()
)
spark

`gcloud auth application-default print-access-token`

In [None]:
df = (spark.read
      .option("gcpAccessToken", '<your token here>')
      .option("parentProject", "<your GCP project here>")
      .format("bigquery")
      .load("bigquery-public-data.samples.shakespeare")
     )
df.groupBy('corpus') \
  .sum('word_count') \
  .orderBy('sum(word_count)', ascending=False) \
  .show(3)

In [None]:
from pyspark.sql import SparkSession
spark = (
    SparkSession.builder
    .master('yarn')
    .config('spark.jars.packages','com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.24.2')
    .getOrCreate()
)
spark

### Lesson #6: It’s dangerous to go alone! Take your helper.py along.

In [None]:
with open('./helper.py', 'w') as f:
    f.write(
'''def get_module():
    return __name__
''')
with open('./utils.py', 'w') as f:
    f.write(
'''def get_module():
    return __name__
''')
import helper
import utils
f'{helper.get_module()}, {utils.get_module()}'

In [None]:
import os
os.environ['PYSPARK_PYTHON'] = 'python3'
from pyspark.sql import SparkSession
spark = (
    SparkSession.builder
    .master('yarn')
    .getOrCreate()
)
spark

In [None]:
!zip friends.zip utils.py helper.py
spark.sparkContext.addPyFile('./friends.zip')

In [None]:
from pyspark.sql.functions import udf
def get_module_names():
    import utils
    import helper
    return f'{helper.get_module()}, {utils.get_module()}'
py_udf = udf(get_module_names)
spark.range(3).withColumn("newCol", py_udf()).show(truncate=False)