# Pre-requisite
- We need to create a data lake account and also upload the input file to it
- Also need to create Service Principal and then allow access to SP on the Data Lake container which contains input files

## Mount Data from Data Lake

In [0]:
# Specify the Data Lake account details
adlsAccountName = "sadlmay22"
adlsContainerName = "inputdata"
adlsFolderName = "data"

In [0]:
# Specify details of SP which will be used in this notebook to connect to the Data Lake account
applicationId = "4772ddf7-5e2a-40fe-a718-6199adcb66f4"
authenticationKey = "o128Q~jUYSDrNIXMTdbnqfxKY6X7jlStMU3IkaVg"
tenandId = "6bb2f9af-a0af-4c32-a5ec-5f7011d37551"

In [0]:
endpoint = "https://login.microsoftonline.com/" + tenandId + "/oauth2/token"
source = "abfss://" + adlsContainerName + "@" + adlsAccountName + ".dfs.core.windows.net/"

In [0]:
configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": applicationId,
           "fs.azure.account.oauth2.client.secret": authenticationKey,
           "fs.azure.account.oauth2.client.endpoint": endpoint}

In [0]:
dbutils.fs.unmount("/mnt/data")
print("Source: ", source)
print("configs: ", configs)
print("endpoint: ", endpoint)

print("applicationId: ", applicationId)
print("authenticationKey: ", authenticationKey)
print("tenandId: ", tenandId)

print("adlsAccountName: ", adlsAccountName)
print("adlsContainerName: ", adlsContainerName)

print("adlsFolderName: ", adlsFolderName)

In [0]:
try:
  dbutils.fs.mount(source = source,mount_point = "/mnt/data",extra_configs = configs)
except Exception as e:
  print(e)
  #pass

In [0]:
%fs
ls /mnt/data/

path,name,size,modificationTime
dbfs:/mnt/data/data/,data/,0,1652752896000


In [0]:
from pyspark.sql.functions import col, column, spark_partition_id

In [0]:
flightData2015="dbfs:/mnt/data/data/flight-data/csv/2010-summary.csv"
flightData2015="dbfs:/mnt/data/data/flight-data/csv"

In [0]:
flightDataDF2015 = spark\
.read\
.option("inferSchema", "true")\
.option("header", "true")\
.csv(flightData2015)

flightDataDF2015=flightDataDF2015.withColumn("partitionID", spark_partition_id())
flightDataDF2015.rdd.getNumPartitions()

In [0]:
flightDataDF2015.show(5000)

In [0]:
from pyspark.sql.functions import concat, collect_list, collect_set, concat_ws

#Notice that values in the column - DEST_COUNTRY_NAME are spead over all partitions. It will create shuffling issues if we do any operation based on DEST_COUNTRY_NAME
flightDataDF2015.groupBy("DEST_COUNTRY_NAME")\
.agg(concat_ws(", ", collect_set(flightDataDF2015.partitionID))).display(100)

DEST_COUNTRY_NAME,"concat_ws(, , collect_set(partitionID))"
Algeria,"0, 1"
Angola,"0, 1"
Anguilla,"0, 1"
Antigua and Barbuda,"0, 1"
Argentina,"0, 1"
Aruba,"0, 1"
Australia,"0, 1"
Austria,"0, 1"
Azerbaijan,"0, 1"
Bahrain,"0, 1"


## Repartition based on a column using which we need to do group by

In [0]:
partition_count=flightDataDF2015.select('DEST_COUNTRY_NAME').distinct().count()
print(partition_count)

In [0]:
flightDataDF2015_rp=flightDataDF2015.repartition(partition_count, col("DEST_COUNTRY_NAME")).withColumn("partitionID", spark_partition_id())
flightDataDF2015_rp.sort("DEST_COUNTRY_NAME").show(5000)

In [0]:
flightDataDF2015_rp.sort("partitionID").show(5000)

In [0]:
flightDataDF2015_rp.rdd.getNumPartitions()

In [0]:
# Notice that one column is aligned to a single partition. It will avoid shuffling issues
flightDataDF2015_rp.groupBy("DEST_COUNTRY_NAME")\
.agg(concat_ws(", ", collect_set(flightDataDF2015_rp.partitionID))).display(5000)

#flightDataDF2015_rp.sort("partitionID").show(20000)

DEST_COUNTRY_NAME,"concat_ws(, , collect_set(partitionID))"
Equatorial Guinea,0
Romania,1
Switzerland,3
Guyana,4
Belgium,5
United Arab Emirates,8
Gibraltar,9
New Zealand,10
Liberia,10
Norway,12


In [0]:
# Notice that one column is aligned to a single partition. It will avoid shuffling issues
flightDataDF2015_rp.groupBy("partitionID")\
.agg(concat_ws(", ", collect_set(flightDataDF2015_rp.partitionID))).display(5000)

#flightDataDF2015_rp.sort("partitionID").show(20000)

partitionID,"concat_ws(, , collect_set(partitionID))"
0,0
1,1
3,3
4,4
5,5
8,8
9,9
10,10
12,12
13,13


## Execution plan comparision between SQL Way and Datafram Way

In [0]:
flightDataDF2015.createOrReplaceTempView("flight_data_2015")
sqlWay = spark\
.sql("""
select dest_country_name, count(1)
from flight_data_2015
group by dest_country_name
""")

In [0]:
dataFrameWay = flightDataDF2015\
.groupBy("dest_country_name")\
.count()

In [0]:
# The physical execution plans are almost ame either way
sqlWay.explain()
dataFrameWay.explain()

## Grouping on regular dataframe

In [0]:
flightDataDF2015_groupBy=flightDataDF2015\
.groupBy("DEST_COUNTRY_NAME")\
.sum("count")\
.withColumnRenamed("sum(count)", "destination_total")

In [0]:
# Only 3 partitions are created and group by will result in shuffling
flightDataDF2015.rdd.getNumPartitions()

In [0]:
# Let's save the results. This will initiate Action and also to the data shuffling
dbutils.fs.rm("/mnt/data/data/gprby_DEST_COUNTRY_NAME",True)
#Here data shuffleing will happen which is an expensive operation
flightDataDF2015_groupBy.write.mode('overwrite').format("csv").option("header", "true").save("dbfs:/mnt/data/data/gprby_DEST_COUNTRY_NAME")

In [0]:
%fs
ls /mnt/data/data/gprby_DEST_COUNTRY_NAME

path,name,size,modificationTime
dbfs:/mnt/data/data/gprby_DEST_COUNTRY_NAME/_SUCCESS,_SUCCESS,0,1652754871000
dbfs:/mnt/data/data/gprby_DEST_COUNTRY_NAME/_committed_6909895664546789976,_committed_6909895664546789976,113,1652754871000
dbfs:/mnt/data/data/gprby_DEST_COUNTRY_NAME/_started_6909895664546789976,_started_6909895664546789976,0,1652754870000
dbfs:/mnt/data/data/gprby_DEST_COUNTRY_NAME/part-00000-tid-6909895664546789976-c0153a05-4972-45ee-91be-0c9730c02e08-601-1-c000.csv,part-00000-tid-6909895664546789976-c0153a05-4972-45ee-91be-0c9730c02e08-601-1-c000.csv,2026,1652754870000


## Grouping on repartitioned dataframe

In [0]:
# No/Minimal shuffling will happen
flightDataDF2015_rp_groupBy=flightDataDF2015_rp\
.groupBy("dest_country_name")\
.sum("count")\
.withColumnRenamed("sum(count)", "destination_total")

In [0]:
dbutils.fs.rm("/mnt/data/data/gprby2_DEST_COUNTRY_NAME",True)

In [0]:
# Here the RDD Action will trigger and notice that multiple files will be saved, one file each partition
flightDataDF2015_rp_groupBy.write.mode('overwrite').format("csv").option("header", "true").save("dbfs:/mnt/data/data/gprby2_DEST_COUNTRY_NAME")

In [0]:
%fs
ls /mnt/data/data/gprby2_DEST_COUNTRY_NAME

path,name,size,modificationTime
dbfs:/mnt/data/data/gprby2_DEST_COUNTRY_NAME/_SUCCESS,_SUCCESS,0,1652754884000
dbfs:/mnt/data/data/gprby2_DEST_COUNTRY_NAME/_committed_2248240904309429254,_committed_2248240904309429254,8657,1652754884000
dbfs:/mnt/data/data/gprby2_DEST_COUNTRY_NAME/_started_2248240904309429254,_started_2248240904309429254,0,1652754880000
dbfs:/mnt/data/data/gprby2_DEST_COUNTRY_NAME/part-00000-tid-2248240904309429254-96d27279-88e9-43be-8021-a0d170df080a-604-1-c000.csv,part-00000-tid-2248240904309429254-96d27279-88e9-43be-8021-a0d170df080a-604-1-c000.csv,56,1652754880000
dbfs:/mnt/data/data/gprby2_DEST_COUNTRY_NAME/part-00001-tid-2248240904309429254-96d27279-88e9-43be-8021-a0d170df080a-605-1-c000.csv,part-00001-tid-2248240904309429254-96d27279-88e9-43be-8021-a0d170df080a-605-1-c000.csv,47,1652754880000
dbfs:/mnt/data/data/gprby2_DEST_COUNTRY_NAME/part-00003-tid-2248240904309429254-96d27279-88e9-43be-8021-a0d170df080a-607-1-c000.csv,part-00003-tid-2248240904309429254-96d27279-88e9-43be-8021-a0d170df080a-607-1-c000.csv,52,1652754880000
dbfs:/mnt/data/data/gprby2_DEST_COUNTRY_NAME/part-00004-tid-2248240904309429254-96d27279-88e9-43be-8021-a0d170df080a-608-1-c000.csv,part-00004-tid-2248240904309429254-96d27279-88e9-43be-8021-a0d170df080a-608-1-c000.csv,46,1652754880000
dbfs:/mnt/data/data/gprby2_DEST_COUNTRY_NAME/part-00005-tid-2248240904309429254-96d27279-88e9-43be-8021-a0d170df080a-609-1-c000.csv,part-00005-tid-2248240904309429254-96d27279-88e9-43be-8021-a0d170df080a-609-1-c000.csv,48,1652754880000
dbfs:/mnt/data/data/gprby2_DEST_COUNTRY_NAME/part-00008-tid-2248240904309429254-96d27279-88e9-43be-8021-a0d170df080a-612-1-c000.csv,part-00008-tid-2248240904309429254-96d27279-88e9-43be-8021-a0d170df080a-612-1-c000.csv,61,1652754880000
dbfs:/mnt/data/data/gprby2_DEST_COUNTRY_NAME/part-00009-tid-2248240904309429254-96d27279-88e9-43be-8021-a0d170df080a-613-1-c000.csv,part-00009-tid-2248240904309429254-96d27279-88e9-43be-8021-a0d170df080a-613-1-c000.csv,48,1652754880000


## Dealing with Manual / Static Schema

In [0]:
staticSchema = flightDataDF2015.schema

In [0]:
print(staticSchema)

In [0]:
flightDataDF2015 = spark\
.read\
.schema(staticSchema)\
.option("header", "true")\
.csv(flightData2015)

In [0]:
flightDataDF2015.display()

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count,partitionID
United States,Romania,1,
United States,Ireland,264,
United States,India,69,
Egypt,United States,24,
Equatorial Guinea,United States,1,
United States,Singapore,25,
United States,Grenada,54,
Costa Rica,United States,477,
Senegal,United States,29,
United States,Marshall Islands,44,


In [0]:
from pyspark.sql.types import StructField, StructType, StringType, LongType

In [0]:
myManualSchema = StructType([\
                            StructField("DEST_COUNTRY_NAME", StringType(), nullable=False),\
                            StructField("ORIGIN_COUNTRY_NAME", StringType(), nullable=False),\
                            StructField("count", StringType(), nullable=False, metadata={"hello":"world"})\
                            ])
df1 = spark.read.format("csv").schema(myManualSchema)\
.load(flightData2015)


In [0]:
df1.display()

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,1
United States,Ireland,264
United States,India,69
Egypt,United States,24
Equatorial Guinea,United States,1
United States,Singapore,25
United States,Grenada,54
Costa Rica,United States,477
Senegal,United States,29


In [0]:
flightDataDF2015.rdd.getNumPartitions()

## Can specify to create multiple files, one for each value in the columns

In [0]:
# Slow Performance as all data is to be reshuffled
dbutils.fs.rm("dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME", True)
flightDataDF2015.write.partitionBy("DEST_COUNTRY_NAME").format("csv").save("dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME")

In [0]:
%fs
ls /mnt/data/data/P_By_DEST_COUNTRY_NAME

path,name,size,modificationTime
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME/DEST_COUNTRY_NAME=Afghanistan/,DEST_COUNTRY_NAME=Afghanistan/,0,1652754897000
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME/DEST_COUNTRY_NAME=Algeria/,DEST_COUNTRY_NAME=Algeria/,0,1652754897000
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME/DEST_COUNTRY_NAME=Angola/,DEST_COUNTRY_NAME=Angola/,0,1652754897000
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME/DEST_COUNTRY_NAME=Anguilla/,DEST_COUNTRY_NAME=Anguilla/,0,1652754897000
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME/DEST_COUNTRY_NAME=Antigua and Barbuda/,DEST_COUNTRY_NAME=Antigua and Barbuda/,0,1652754897000
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME/DEST_COUNTRY_NAME=Argentina/,DEST_COUNTRY_NAME=Argentina/,0,1652754897000
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME/DEST_COUNTRY_NAME=Aruba/,DEST_COUNTRY_NAME=Aruba/,0,1652754898000
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME/DEST_COUNTRY_NAME=Australia/,DEST_COUNTRY_NAME=Australia/,0,1652754898000
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME/DEST_COUNTRY_NAME=Austria/,DEST_COUNTRY_NAME=Austria/,0,1652754898000
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME/DEST_COUNTRY_NAME=Azerbaijan/,DEST_COUNTRY_NAME=Azerbaijan/,0,1652754898000


In [0]:
# Fast Performance as all data is spread across partitions with each executer
dbutils.fs.rm("dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME_2", True)
flightDataDF2015_rp.write.partitionBy("DEST_COUNTRY_NAME").format("csv").save("dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME_2")

In [0]:
%fs
ls /mnt/data/data/P_By_DEST_COUNTRY_NAME_2

path,name,size,modificationTime
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME_2/DEST_COUNTRY_NAME=Algeria/,DEST_COUNTRY_NAME=Algeria/,0,1652754978000
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME_2/DEST_COUNTRY_NAME=Angola/,DEST_COUNTRY_NAME=Angola/,0,1652754980000
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME_2/DEST_COUNTRY_NAME=Anguilla/,DEST_COUNTRY_NAME=Anguilla/,0,1652754980000
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME_2/DEST_COUNTRY_NAME=Antigua and Barbuda/,DEST_COUNTRY_NAME=Antigua and Barbuda/,0,1652754977000
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME_2/DEST_COUNTRY_NAME=Argentina/,DEST_COUNTRY_NAME=Argentina/,0,1652754981000
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME_2/DEST_COUNTRY_NAME=Aruba/,DEST_COUNTRY_NAME=Aruba/,0,1652754981000
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME_2/DEST_COUNTRY_NAME=Australia/,DEST_COUNTRY_NAME=Australia/,0,1652754981000
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME_2/DEST_COUNTRY_NAME=Austria/,DEST_COUNTRY_NAME=Austria/,0,1652754981000
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME_2/DEST_COUNTRY_NAME=Azerbaijan/,DEST_COUNTRY_NAME=Azerbaijan/,0,1652754981000
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME_2/DEST_COUNTRY_NAME=Bahrain/,DEST_COUNTRY_NAME=Bahrain/,0,1652754981000


In [0]:
# Multiple executers will load the data paralally
df1 = spark.read.format("csv").schema(myManualSchema)\
.load("dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME_2")

In [0]:
# Multiple executers will load the data paralally
df1 = spark.read.format("csv").schema(myManualSchema)\
.load("dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME_2")

In [0]:
df1.createOrReplaceTempView("flight_data_2015")
sqlWay = spark\
.sql("""
select *
from flight_data_2015
where dest_country_name='United States'
""")

In [0]:
sqlWay.show(10000000)

In [0]:
# Notice the number of partitions created
df1.rdd.getNumPartitions()

In [0]:
df1.display()

ORIGIN_COUNTRY_NAME,count,DEST_COUNTRY_NAME
Romania,12,United States
Croatia,1,United States
Ireland,266,United States
India,60,United States
Niger,1,United States
Singapore,22,United States
Grenada,40,United States
Sint Maarten,260,United States
Marshall Islands,33,United States
Paraguay,15,United States
