In [None]:
# Databricks notebook source
#from databricks.connect import DatabricksSession
#from pyspark.sql.types import IntegerType, StringType, StructField, StructType
#
#spark = DatabricksSession.builder.profile("test").getOrCreate()

There are different types of Cluster Managers when it comes to Spark Applications. You can review [this page](https://spark.apache.org/docs/latest/cluster-overview.html) for more detail.
* Local (only one node for driver as well as executor)
* Cluster
    * YARN
    * Mesos
    * Kubernetes
    * Stand Alone cluster (Databricks is based on Stand Alone)

A single node compute in databricks only has the driver, which works as master and worker.
- spawns one executor thread per logical core, -1 core for the driver

In [None]:
# /airlines/ has multiple csvs
%fs ls 'dbfs:/databricks-datasets/asa/airlines'

path,name,size,modificationTime
dbfs:/databricks-datasets/asa/airlines/1987.csv,1987.csv,127162942,1459744248000
dbfs:/databricks-datasets/asa/airlines/1988.csv,1988.csv,501039472,1459744260000
dbfs:/databricks-datasets/asa/airlines/1989.csv,1989.csv,486518821,1459744335000
dbfs:/databricks-datasets/asa/airlines/1990.csv,1990.csv,509194687,1459744384000
dbfs:/databricks-datasets/asa/airlines/1991.csv,1991.csv,491210093,1459744438000
dbfs:/databricks-datasets/asa/airlines/1992.csv,1992.csv,492313731,1459744493000
dbfs:/databricks-datasets/asa/airlines/1993.csv,1993.csv,490753652,1459744545000
dbfs:/databricks-datasets/asa/airlines/1994.csv,1994.csv,501558665,1459744608000
dbfs:/databricks-datasets/asa/airlines/1995.csv,1995.csv,530751568,1459744663000
dbfs:/databricks-datasets/asa/airlines/1996.csv,1996.csv,533922363,1459744718000


In [None]:
#  spark.read.csv can read multiple csvs into one dataframe if they have the same schema
df = spark.read.csv("dbfs:/databricks-datasets/asa/airlines", header=True)

In [None]:
# get number of partitions of df, partitions is based on number of paralelism and files in folder
print(df.rdd.getNumPartitions())

93


In [None]:
# get number of rows in specific csv in folder
print(spark.read.csv('dbfs:/databricks-datasets/asa/airlines/1987.csv', header=True).count())

1311826


In [None]:
# .count() le o numero de linhas de um dataframe
print(df.count())

123534969


In [None]:
# .columns reads amount of columns in dataframe
print(len(df.columns))

29



* Some operations in spark can lead to a data **shuffle** between partitions. This is typically done to to ensure that records with the same key end up on the same partition. These are: repartition, join, cogroup, and any of the *By or *ByKey transformations. **Adaptive Query Execution** (enabled with *spark.conf.set('spark.sql.adaptive.enabled', True)*) allows post shuffle partitions to be coalesced based on the map output statistics, leading to a more optimized number of partitions in each stage.
* Typically row-oriented formats (csv) are more efficient for queries that either must access most of the columns, or only read a fraction of the rows. Column-oriented formats (parquet), on the other hand, are usually more efficient for queries that need to read most of the rows, but only have to access a fraction of the columns. 

In [None]:
# get a dataframe by reading the airlines folder with multiple csvs, groupby() year, month and daymonth, get count of each group and write to a parquet file
import getpass
username = getpass.getuser()

dbutils.fs.rm(f'/user/{username}/airlines', recurse=True)

spark. \
    read. \
    csv('dbfs:/databricks-datasets/asa/airlines', header=True). \
    groupBy('Year', 'Month', 'DayOfMonth'). \
    count(). \
    write. \
    parquet(f'/user/{username}/airlines', mode='overwrite')

In [None]:
%fs ls '/user/root/airlines'

path,name,size,modificationTime
dbfs:/user/root/airlines/_SUCCESS,_SUCCESS,0,1709233078000
dbfs:/user/root/airlines/_committed_1466639245947400032,_committed_1466639245947400032,124,1709233078000
dbfs:/user/root/airlines/_started_1466639245947400032,_started_1466639245947400032,0,1709233078000
dbfs:/user/root/airlines/part-00000-tid-1466639245947400032-2938f983-386b-4777-95d2-7db79bd0e129-122-1.c000.snappy.parquet,part-00000-tid-1466639245947400032-2938f983-386b-4777-95d2-7db79bd0e129-122-1.c000.snappy.parquet,41996,1709233078000


In [None]:
df_parquet = spark.read.parquet(f'/user/{username}/airlines')

In [None]:
df_parquet.inputFiles()

['dbfs:/user/root/airlines/part-00000-tid-1466639245947400032-2938f983-386b-4777-95d2-7db79bd0e129-122-1.c000.snappy.parquet']

In [None]:
df_parquet.show()

+----+-----+----------+-----+
|Year|Month|DayOfMonth|count|
+----+-----+----------+-----+
|2007|    1|         6|17783|
|2007|    6|        10|20674|
|2007|    8|        28|20975|
|2007|   12|        19|21026|
|2007|    3|         2|21305|
|2007|    4|        30|21032|
|2007|    1|         1|19563|
|2007|   10|        22|21126|
|2007|    7|        31|21589|
|2007|    8|        13|21682|
|2007|   10|         5|21213|
|2007|   10|         9|20556|
|2007|    3|        30|21340|
|2007|    1|        21|19533|
|2007|    5|         4|21148|
|2007|    3|         8|21305|
|2007|    3|        25|20181|
|2007|    3|        31|18316|
|2007|    7|        10|21597|
|2007|   11|        25|20933|
+----+-----+----------+-----+
only showing top 20 rows



* Worker [node] -> Executor(s) -> Slot(s) [logical cores]
* Job -> Stage(s) [paralelizable if no sequential dependency] -> Task(s) [paralelizable]
* Task -> partition + transformation

As tasks are completed, new slots in the executors become available, and the standby tasks are assigned to these available slots.
Running the previous code in single node compute showcases the idea of waiting tasks.

In [None]:
spark.conf.get('spark.master')

'local[*, 4]'

In [None]:
spark.conf.get('spark.sql.adaptive.enabled')

'true'

In [None]:
# ineffective if adaptive query execution is enabled
spark.conf.get('spark.sql.shuffle.partitions')

'200'