# Setup
Install Java (I am on 11), Spark and Python (I have 3.8). 

Install Jupyter server and jupyter plugin in vscode.

Set env variables as below (Change values according to your setup)

```
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
export SPARK_HOME=/home/kamal/spark-3.1.2-bin-hadoop3.2
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
export PYTHONPATH=${SPARK_HOME}/python/:$(echo ${SPARK_HOME}/python/lib/py4j-*-src.zip):${PYTHONPATH}
```


# Create Sessions
## Create Spark Session

In [None]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder
.appName("airline").getOrCreate())


## Create Delta spark session
* This requires delta lake jar be downloaded from mavenrepository and placed in SPARK_HOME/jars. Restart Jupyter server after that.  
* Install delta-spark package (pip install delta-spark)

In [None]:
from delta import *

builder = SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

delta_spark = configure_spark_with_delta_pip(builder).getOrCreate()


# Read and operate on Data
## Read CSV with string schema
Data used here is available from databricks github [page](https://github.com/databricks/LearningSparkV2.git)

In [None]:
csv_file = "./databricks-datasets/learning-spark-v2/flights/departuredelays.csv"
schema = "date string, delay int, distance int, origin string, destination string"
df = (spark.read
.schema(schema)
.option("header", "true")
.csv(csv_file)).cache()
df.printSchema()
df.first()

## Operate on Columns, with col

In [None]:
df.select(col("distance") / 1000).show(2)

## Using withColumn()

In [None]:
df.withColumn("calculatedCol", col("distance")/1000).show(2)

## Timestamp from millisecond (since epoch)

In [None]:
spark.sql("select timestamp_millis(1631163333673) as timestampdata").show(truncate=False)

## Working with JSON

### Extract with from_json()

In [None]:
data = [(1, '''{"a": 1,"b": 2.3}''')]
schema = 'a INT, b FLOAT'
df_json = spark.createDataFrame(data, ("key", "value"))
df_json.select(from_json(df_json.value, schema)["a"].alias("id"), 
from_json(df_json.value, schema)["b"].alias("value")).show()

### Extract with get_json_object()

In [None]:
df_json.select(get_json_object("value","$.a").alias("id"), 
get_json_object("value","$.b").alias("value")).show()

# Filter Dataframe

In [None]:
df.printSchema()
from pyspark.sql.functions import col
df.select("distance","origin","destination").filter(col("distance") > 1000).distinct().show(5)

# Create Views
* Global views are used, in queries, with "global_temp" prefix.

In [None]:
df.createOrReplaceTempView("Airline")
df.createOrReplaceGlobalTempView("GlobalAirlineView")

In [None]:
spark.sql("select * from Airline limit 10").show()

In [None]:
spark.sql("""SELECT delay, origin, destination, 
              CASE
                  WHEN delay > 360 THEN 'Very Long Delays'
                  WHEN delay >= 120 AND delay <= 360 THEN 'Long Delays'
                  WHEN delay >= 60 AND delay < 120 THEN 'Short Delays'
                  WHEN delay > 0 and delay < 60 THEN 'Tolerable Delays'
                  WHEN delay = 0 THEN 'No Delays'
                  ELSE 'Early'
               END AS Flight_Delays
               FROM Airline
               ORDER BY origin, delay DESC""").show(10)

In [None]:
spark.sql("""SELECT *,
                CASE
                    WHEN delay < 0 THEN 'EARLY'
                    WHEN delay == 0 THEN 'ON-TIME'
                    ELSE 'DELAY'
                END AS FL_Delay
             FROM Airline""").show(6)   

# Add column to DF
## Use multiple conditions on column with when-otherwise

In [None]:
from pyspark.sql.functions import *
df.withColumn('FL_delay', when(col("delay") > 0, 'DELAYED')
.when(col("delay") < 0, 'EARLY')
.otherwise('ON-TIME')).show(6)

# Set shuffle partitions

In [None]:
spark.conf.set("spark.sql.shuffle.partitions",4)
df.write.option("path","/home/kamal/airline_data").format("csv").saveAsTable("Airline")
df.rdd.getNumPartitions()

# Create, Use and drop databases

In [None]:
spark.catalog.listDatabases()
spark.sql("drop database if exists newdb cascade")
spark.sql("create database newdb location '/home/kamal/spark-warehouse/newdatabase'")
spark.catalog.listDatabases()

In [None]:
spark.sql("use newdb")

# Working with AVRO
Avro package is still external (but supported), so needs to be downloaded and placed in classpath. Download spark-avro jar from [mavenrepo](https://mvnrepository.com/artifact/org.apache.spark/spark-avro_2.12/3.1.2),  and place it in $SPARK_HOME/jars. Then restart Jupyter server 

In [None]:
spark.conf.set("spark.jars.packages", "com.databricks:spark-avro_2.12:3.1.2")
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW episode_tbl
    USING com.databricks.spark.avro
    OPTIONS (
      path "./databricks-datasets/learning-spark-v2/flights/summary-data/avro/*"
    )
""")

In [None]:
df_fl_summary = spark.sql("select * from episode_tbl")
df_fl_summary.count()
df_fl_summary.show(5)

# UDFs

In [None]:
from pyspark.sql.types import LongType
import pandas as pd
def cubed(s: pd.Series) -> pd.Series:
    return s*s*s

df_range = spark.range(1,9)    
cubed_udf = pandas_udf(cubed, returnType=LongType())

In [None]:
df_range.select("id",cubed_udf("id").alias("cubed")).show()

In [None]:
spark.udf.register("cubed_udf",cubed_udf)

In [None]:
df_range.createOrReplaceTempView("udf_test")
spark.sql("select id, cubed_udf(id) from udf_test").show()

# Delta Read and Write

In [None]:
data = delta_spark.sql("select * from global_temp.GlobalAirlineView").cache()
data.write.format("delta").option("mergeSchema", "true").mode("overwrite").save("/tmp/delta-table/")

In [None]:
df = spark.read.format("delta").load("/tmp/delta-table")
df.show(5)

# Spark Streaming

## foreachBatch Function

foreachBatch lets us use "update", "append" and "complete" output modes. Whereas file sink only allows append.

In [None]:
from functools import partial
def foreachBatchFunction(format, location, df, batchId):
        (df
        .write
        .mode("overwrite")
        .format(format)
        .save(location+str(batchId)))
writeMicroBatchUpdateToJSON = partial(foreachBatchFunction,"JSON","/home/kamal/jsonUpdate/")
writeMicroBatchAppendToJSON = partial(foreachBatchFunction,"JSON","/home/kamal/jsonAppend/")
writeMicroBatchCompleteToJSON = partial(foreachBatchFunction,"JSON","/home/kamal/jsonComplete/")
writeMicroBatchAppendToJSON10Sec = partial(foreachBatchFunction,"JSON","/home/kamal/jsonAppend10sec/")
writeMicroBatchToCSV = partial(foreachBatchFunction,"JSON","/home/kamal/sparkcsv/")

## Word Count
Generate test dat using [datastreamer](https://github.com/skamalj/datagenerator) - with followung config. It creates space separated list of  6 words in each line
>`lorem|words|textline|6`

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
sparkstream = (SparkSession.builder
.appName("airline").getOrCreate())
lines = (sparkstream
  .readStream.format("socket")
  .option("host", "localhost")
  .option("port", 4000)
  .load())

linesnojson = lines.select(get_json_object("value","$.textline").alias("line"))

words = linesnojson.select(explode(split(col("line"), " ")).alias("word"))
counts = words.groupBy("word").count()

checkpointDir = "/home/kamal/sparkcheckpoint"
checkpointFileSyncDir = "/home/kamal/sparkcheckpointcsv"

#streamingQuery = (counts
#  .writeStream
#  .format("console")
#  .outputMode("complete")
#  .trigger(processingTime="20 second")
#  .option("checkpointLocation", checkpointDir)
#  .start())

streamingQueryFile = (counts
  .writeStream
  .foreachBatch(writeMicroBatchToCSV)
  .outputMode("update")
  .trigger(processingTime="20 second")
  .option("checkpointLocation", checkpointFileSyncDir)
  .start())

In [None]:
#streamingQuery.stop()
streamingQueryFile.stop()

## Streaming with eventtime
Use generator with following config
>`datatype|number|id|{"min":0,"max":10}`

>`datatype|float|temp|{"min":20,"max":50}`

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
sparkstream = (SparkSession.builder
.appName("avgtemp").getOrCreate())
readings = (sparkstream
  .readStream.format("socket")
  .option("host", "localhost")
  .option("port", 4000)
  .load().withColumn("current_timestamp", current_timestamp()))

readings_tab = readings.select(get_json_object("value","$.id").alias("id"),
get_json_object("value","$.temp").alias("temp"),
timestamp_seconds(round(get_json_object("value","$.eventtime") / 1000)).alias("eventtime"),
"current_timestamp")

readings_tab = readings_tab.withColumn("delayed_eventtime", col("eventtime") - expr("INTERVAL 10 seconds"))

eventcol = "delayed_eventtime"

samples = (readings_tab.withWatermark(eventcol, "10 seconds")
.groupBy("id", window(eventcol, "20 second"))
.agg(count("id").alias("samples"),avg("temp").alias("averageTemp"),
max("current_timestamp").alias("processed_at"), max(eventcol).alias("generated_at")))

## Cooment / uncomment below blocks as per need

#streamingQuery = (samples
#  .writeStream
#  .format("console")
#  .option("truncate", "false")
#  .outputMode("append")
#  .trigger(processingTime="10 second")
#  .start())
#
#streamingQueryJsonAppend = (samples
#  .writeStream
#  .foreachBatch(writeMicroBatchAppendToJSON10Sec)
#  .outputMode("append")
#  .option("checkpointLocation", "/home/kamal/spark/checkpoint/jsonappend10sec")
#  .trigger(processingTime="10 second")
#  .start())   

streamingQueryJsonAppend = (samples
  .writeStream
  .foreachBatch(writeMicroBatchAppendToJSON)
  .outputMode("append")
  .option("checkpointLocation", "/home/kamal/spark/checkpoint/jsonappend")
  .trigger(processingTime="20 second")
  .start())   

#streamingQueryJsonUpdate = (samples
#  .writeStream
#  .foreachBatch(writeMicroBatchUpdateToJSON)
#  .outputMode("update")
#  .option("checkpointLocation", "/home/kamal/spark/checkpoint/jsonupdate")
#  .trigger(processingTime="20 second")
#  .start())   

#streamingQueryJsonComplete = (samples
#  .writeStream
#  .foreachBatch(writeMicroBatchCompleteToJSON)
#  .outputMode("complete")
#  .option("checkpointLocation", "/home/kamal/spark/checkpoint/jsoncomplete")
#  .start())

## Stop Streaming Queries

In [None]:
#streamingQueryJsonUpdate.stop()
streamingQueryJsonAppend.stop()

In [None]:
dfc = spark.read.format("delta").load("/tmp/deltaeventscomplete").cache()
dfc.filter("id == 2").show(15, truncate = False)


In [None]:
df = spark.read.format("delta").load("/tmp/deltaevents").cache()
df.filter("id == 2").show(15, truncate = False)

In [None]:

from pyspark.sql.functions import col
(df.join(dfc,['id','window'], "fullOuter")
    .select("id", "window", dfc.samples.alias("complete"), df.samples,dfc.processed_at)
    .orderBy("id")
    .filter("id == 10")
    .show(50, truncate=False))

# Scala excercise from [Sample question](https://databricks-prod-cloudfront.cloud.databricks.com/public/793177bc53e528530b06c78a4fa0e086/0/6221173/100020/latest.html). Translated to Pyspark

## Imports and Create DF

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

schema = StructType([
  StructField("name", StringType(), True),
  StructField("department", IntegerType(), True),
  StructField("score", ArrayType(IntegerType()), True)
])

peopleDF = spark.createDataFrame([
  ("Ali", 0, [100]),
  ("Barbara", 1, [300, 250, 100]),
  ("Cesar", 1, [350, 100]),
  ("Dongmei", 1, [400, 100]),
  ("Eli", 2, [250]),
  ("Florita", 2, [500, 300, 100]),
  ("Gatimu", 3, [300, 100])
],schema) 

In [None]:
peopleDF.schema

## Create windowSpec

In [None]:
from pyspark.sql.window import Window

windowSpec = Window.partitionBy("department").orderBy(col("escore").desc())


## Execute using windowSpec

In [None]:
(peopleDF.select("*", explode("score")
.alias("escore"))
.drop("score")
.select("name", "department","escore", rank().over(windowSpec).alias("maxrank"))
.filter("maxrank == 1")
.drop("maxrank")
.orderBy("department")
.show())