# 0 - Setup Notebook Pod

## 0.1 - Run in Jupyter Bash Terminal
```bash
# create application-default credentials
gcloud auth application-default login
```

# 1 - Initialize SparkSession

In [None]:
import pyspark
from pyspark.sql import SparkSession

# construct spark_jars list
spark_jars = ["https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-latest.jar"]
if pyspark.version.__version__[0] == "3":
    spark_jars.append("https://storage.googleapis.com/spark-lib/bigquery/spark-bigquery-latest_2.12.jar")
else:
    spark_jars.append("https://storage.googleapis.com/spark-lib/bigquery/spark-bigquery-latest_2.11.jar")

# create SparkSession
spark = SparkSession \
    .builder \
    .master("local[1]") \
    .config("spark.driver.cores", "1") \
    .config("spark.driver.memory", "4g") \
    .config("spark.jars", ",".join(spark_jars)) \
    .config("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "LEGACY") \
    .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \
    .config("spark.hadoop.fs.gs.auth.service.account.enable", "true") \
    .config("spark.hadoop.fs.gs.auth.service.account.json.keyfile", "/home/jovyan/.config/gcloud/application_default_credentials.json") \
    .getOrCreate()

# 2 - SparkSQL

## 2.0 - Docs
* https://spark.apache.org/docs/latest/sql-getting-started.html
* https://spark.apache.org/docs/latest/api/python/pyspark.sql.html

## 2.1 - Write CSV

In [None]:
# create a DataFrame
df = spark.createDataFrame(
    [("aaa", 1, "!!!"),
     ("bbb", 2, "@@@"),
     ("ccc", 3, "###"),
     ("ddd", 4, "%%%")],
    schema=["col1", "col2", "col3", ]
)

# write CSV
out_uri = f"gs://<<<MY_BUCKET>>>/example/spark_test.csv"
df.write \
    .format("csv") \
    .mode("overwrite") \
    .option("header", "true") \
    .save(out_uri)

# link to GUI
print("----------------")
print("View in GUI:")
print(f"https://console.cloud.google.com/storage/browser/${out_uri.lstrip('gs://')}/")
print("----------------")

## 2.2 - Read CSV

In [None]:
# read CSV
in_uri = f"gs://<<<MY_BUCKET>>>/example/spark_test.csv"
df2 = spark.read \
    .format("csv") \
    .option("mode", "FAILFAST") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .load(in_uri)

# view DataFrame
df2.show()

# 3 - BigQuery

## 3.0 - Docs
* https://github.com/GoogleCloudDataproc/spark-bigquery-connector

## 3.1 - Write to BigQuery

In [None]:
# create a DataFrame
df3 = spark.createDataFrame(
    [("aaa", 1, "!!!"),
     ("bbb", 2, "@@@"),
     ("ccc", 3, "###"),
     ("ddd", 4, "%%%")],
    schema=["col1", "col2", "col3", ]
)

# write to BigQuery
out_project = "<<<MY_PROJECT>>>"
out_table = "<<<MY_DATABASE>>>.example__spark_notebook"
billing_project = "<<<MY_PROJECT>>>"
df3.write \
    .format("bigquery") \
    .mode("overwrite") \
    .option("temporaryGcsBucket", "<<MY_BUCKET>>") \
    .option("parentProject", billing_project) \
    .option("project", out_project) \
    .option("table", out_table) \
    .save()

# link to GUI
print("----------------")
print("View in GUI:")
print(f"https://console.cloud.google.com/bigquery?project=${out_project}")
print("----------------")

## 3.2 - Read from BigQuery

In [None]:
# read from BigQuery
in_project = "<<<MY_PROJECT>>>"
in_table = "<<<MY_DATABASE>>>.example__spark_notebook"
billing_project = "<<<MY_PROJECT>>>"
df4 = spark.read \
    .format("bigquery") \
    .option("readDataFormat", "ARROW") \
    .option("parentProject", billing_project) \
    .option("project", in_project) \
    .option("table", in_table) \
    .load()

# view DataFrame
df4.show()

# 4 - Advanced Functions

## 4.1 - Write File (Hadoop Java API)

In [None]:
def hadoop_write_file(spark: SparkSession,
                      fs_uri: str,
                      overwrite: bool,
                      file_data: str) -> str:
    """
    Write a string as a file using the Hadoop Java API.

    :param spark: a running SparkSession
    :param fs_uri: the URI of the file
    :param overwrite: if we should replace any existing file (error if False)
    :param file_data: the string to write as the file data
    :return the URI of the writen file
    """
    # create py4j wrappers of java objects
    hadoop = spark.sparkContext._jvm.org.apache.hadoop
    java = spark.sparkContext._jvm.java

    # create the FileSystem() object
    conf = spark._jsc.hadoopConfiguration()
    path = hadoop.fs.Path(java.net.URI(fs_uri))
    fs = path.getFileSystem(conf)

    # write the file
    output_stream = fs.create(path, overwrite)
    output_stream.writeBytes(file_data)
    output_stream.close()

    return fs_uri

# write file
out_uri = f"gs://<<<MY_BUCKET>>>/example/spark_test.txt"
file_data = "Hello World! " * 100
hadoop_write_file(spark=spark, fs_uri=out_uri, overwrite=True, file_data=file_data)

# link to GUI
print("----------------")
print("View in GUI:")
print(f"https://console.cloud.google.com/storage/browser/${out_project.lstrip('gs://')}")
print("----------------")

## 4.2 - Read File (Hadoop Java API)

In [None]:
def hadoop_read_file(spark: SparkSession,
                     fs_uri: str,
                     encoding: str = "utf-8") -> str:
    """
    Read the content of a file as a string using the Hadoop Java API.

    :param spark: a running SparkSession
    :param fs_uri: the URI of the file
    :param encoding: the file's encoding (defaults to utf-8)
    :return: the content of the file (or None if the file is not present
    """
    from py4j.protocol import Py4JJavaError

    # create py4j wrappers of scala objects
    commons = spark.sparkContext._jvm.org.apache.commons
    hadoop = spark.sparkContext._jvm.org.apache.hadoop
    java = spark.sparkContext._jvm.java

    # create the FileSystem() object
    conf = spark._jsc.hadoopConfiguration()
    path = hadoop.fs.Path(java.net.URI(fs_uri))
    fs = path.getFileSystem(conf)

    # read file as string
    try:
        input_stream = fs.open(path)
        file_data = commons.io.IOUtils.toString(input_stream, encoding)
        input_stream.close()
        return file_data
    except Py4JJavaError as ex:
        java_exception_class = ex.java_exception.getClass().getName()
        if java_exception_class == "java.io.FileNotFoundException":
            return None
        else:
            raise ex

# read file
in_uri = f"gs://<<<MY_BUCKET>>>/example/spark_test.txt"
file_data = hadoop_read_file(spark=spark, fs_uri=in_uri)

print("-------- File Content --------")
print(file_data)
print("------------------------------")