# Spark Environment Options and Dependencies

* This notebook includes examples of how to connect and configure Spark to run in a local environment and to manage the set of dependencies and options required to read/write data to an S3 compatible object storage.

## Spark Session

* This is the entry point to Spark SQL. It is one of the very first objects you create while developing a Spark SQL application.
* As a Spark developer, you create a SparkSession using the SparkSession.builder method (that gives you access to Builder API that you use to configure the session).

In [3]:
################################################################################
import os
import sys
from pyspark.sql import SparkSession
from delta.tables import *
from delta.tables import DeltaTable
from pyspark.sql.functions import *
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql import functions as f
from pyspark.sql.functions import from_unixtime, col, to_timestamp
from pyspark.sql.functions import udf
import hashlib
import urllib.request
import json
from datetime import timedelta, date
from pyspark import SparkContext
from pyspark import SQLContext
from itertools import islice
from pyspark.sql.functions import col
################################################################################

In [4]:
# Builder API
# Spark session & context
spark=SparkSession.builder.master("local").appName("Hive-Test").enableHiveSupport().getOrCreate()


# Get configurations
# configurations = spark.sparkContext.getConf().getAll()
# for item in configurations: print(item)

## Read Data From S3

In [7]:
# Read CustomersData From Minio
customers = spark.read.option("header",True).csv("s3://d2b-internal-assessment-bucket/orders_data/orders.csv")

# Show Top 5 
customers.show(5)

23/04/24 14:25:29 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: s3://d2b-internal-assessment-bucket/orders_data/orders.csv.
org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:53)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:370)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(

Py4JJavaError: An error occurred while calling o68.csv.
: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:747)
	at scala.collection.immutable.List.map(List.scala:293)
	at org.apache.spark.sql.execution.datasources.DataSource$.checkAndGlobPathIfNecessary(DataSource.scala:745)
	at org.apache.spark.sql.execution.datasources.DataSource.checkAndGlobPathIfNecessary(DataSource.scala:577)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:408)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:274)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:245)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:245)
	at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:571)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)


## Hive Metastore Initialization

In [6]:
spark.sql("show databases;").show()

+---------+
|namespace|
+---------+
|  default|
+---------+



## Read Data from S3 Object Storage (Minio)

In [10]:
# Read SQL
customers.createOrReplaceTempView("customers")

## Perform Transformations on Data

In [3]:
##########################################
# Performing Transformations 
##########################################

uk_customers = spark.sql("""
    SELECT *
    FROM customers
    WHERE country = 'United Kingdom'
    ORDER BY customer_id ASC
    LIMIT 100
""")

## Writing Results to S3 Object Storage (Minio)

### Writing in CSV Format

In [None]:
##########################################
# Writing Results to S3
##########################################
uk_customers.write.option("header","true").csv("s3a://silver/CSV/jupyter/United-Kingdom-Customers")

### Writing in Delta Format

In [None]:
##########################################
# Writing Results to S3
##########################################
uk_customers.write.format("delta").mode("overwrite").option('overwriteSchema','true').save("s3a://silver/Delta/customers") 