# Pre-requisite
- We need to create a data lake account and also upload the input file to it
- Also need to create Service Principal and then allow access to SP on the Data Lake container which contains input files

## Import Data from Data Lake

In [0]:
# Specify the Data Lake account details
adlsAccountName = "dldatabricks21"
adlsContainerName = "inputdata"
adlsFolderName = "data"

In [0]:
# Specify details of SP which will be used in this notebook to connect to the Data Lake account
applicationId = "b9344d96-c48b-426c-8551-68ee134b70ac"
authenticationKey = "cIpuAjg-wP3xKmG0Kel8_pefv7Q546ZfCe"
tenandId = "6bb2f9af-a0af-4c32-a5ec-5f7011d37551"

In [0]:
endpoint = "https://login.microsoftonline.com/" + tenandId + "/oauth2/token"
source = "abfss://" + adlsContainerName + "@" + adlsAccountName + ".dfs.core.windows.net/"

In [0]:
configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": applicationId,
           "fs.azure.account.oauth2.client.secret": authenticationKey,
           "fs.azure.account.oauth2.client.endpoint": endpoint}

In [0]:
try:
  dbutils.fs.mount(source = source,mount_point = "/mnt/data",extra_configs = configs)
except Exception as e:
  pass

In [0]:
#dbutils.fs.unmount("/mnt/data")

In [0]:
%fs
ls /mnt/data/data

path,name,size
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME/,P_By_DEST_COUNTRY_NAME/,0
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME_2/,P_By_DEST_COUNTRY_NAME_2/,0
dbfs:/mnt/data/data/all-summary.csv,all-summary.csv,12425516
dbfs:/mnt/data/data/gprby2_DEST_COUNTRY_NAME/,gprby2_DEST_COUNTRY_NAME/,0
dbfs:/mnt/data/data/gprby_DEST_COUNTRY_NAME/,gprby_DEST_COUNTRY_NAME/,0


In [0]:
from pyspark.sql.functions import col, column, spark_partition_id

In [0]:
flightData2015="dbfs:/mnt/data/data/all-summary.csv"

In [0]:
flightDataDF2015 = spark\
.read\
.option("inferSchema", "true")\
.option("header", "true")\
.csv(flightData2015)

flightDataDF2015=flightDataDF2015.withColumn("partitionID", spark_partition_id())
flightDataDF2015.rdd.getNumPartitions()

In [0]:
from pyspark.sql.functions import concat, collect_list, collect_set, concat_ws

#Notice that values in the column - DEST_COUNTRY_NAME are spead over all partitions. It will create shuffling issues if we do any operation based on DEST_COUNTRY_NAME
flightDataDF2015.groupBy("DEST_COUNTRY_NAME")\
.agg(concat_ws(", ", collect_set(flightDataDF2015.partitionID))).display(100)

DEST_COUNTRY_NAME,"concat_ws(, , collect_set(partitionID))"
Afghanistan,"0, 1, 2"
Algeria,"0, 1, 2"
Angola,"0, 1, 2"
Anguilla,"0, 1, 2"
Antigua and Barbuda,"0, 1, 2"
Argentina,"0, 1, 2"
Aruba,"0, 1, 2"
Australia,"0, 1, 2"
Austria,"0, 1, 2"
Azerbaijan,"0, 1, 2"


## Repartition based on a column using which we need to do group by

In [0]:
flightDataDF2015_rp=flightDataDF2015.repartition(100, col("DEST_COUNTRY_NAME")).sort("DEST_COUNTRY_NAME").withColumn("partitionID", spark_partition_id())

In [0]:
# Notice that one column is aligned to a single partition. It will avoid shuffling issues
flightDataDF2015_rp.groupBy("DEST_COUNTRY_NAME")\
.agg(concat_ws(", ", collect_set(flightDataDF2015_rp.partitionID))).display(1000)

#flightDataDF2015_rp.sort("partitionID").show(20000)

DEST_COUNTRY_NAME,"concat_ws(, , collect_set(partitionID))"
Afghanistan,0
Algeria,0
Angola,0
Anguilla,1
Antigua and Barbuda,2
Argentina,3
Aruba,4
Australia,4
Austria,5
Azerbaijan,6


## Execution plan comparision between SQL Way and Datafram Way

In [0]:
flightDataDF2015.createOrReplaceTempView("flight_data_2015")
sqlWay = spark\
.sql("""
select dest_country_name, count(1)
from flight_data_2015
group by dest_country_name
""")

In [0]:
dataFrameWay = flightDataDF2015\
.groupBy("dest_country_name")\
.count()

In [0]:
# The physical execution plans are almost ame either way
sqlWay.explain()
dataFrameWay.explain()

## Grouping on regular dataframe and inspect performance

In [0]:
flightDataDF2015_groupBy=flightDataDF2015\
.groupBy("DEST_COUNTRY_NAME")\
.sum("count")\
.withColumnRenamed("sum(count)", "destination_total")

In [0]:
# Only 3 partitions are created and group by will result in shuffling
flightDataDF2015.rdd.getNumPartitions()

In [0]:
# Let's save the results. This will initiate Action and also to the data shuffling
dbutils.fs.rm("/mnt/data/data/gprby_DEST_COUNTRY_NAME",True)
#Here data shuffleing will happen which is an expensive operation
flightDataDF2015_groupBy.write.mode('overwrite').format("csv").option("header", "true").save("dbfs:/mnt/data/data/gprby_DEST_COUNTRY_NAME")

In [0]:
%fs
ls /mnt/data/data/gprby_DEST_COUNTRY_NAME

path,name,size
dbfs:/mnt/data/data/gprby_DEST_COUNTRY_NAME/_SUCCESS,_SUCCESS,0
dbfs:/mnt/data/data/gprby_DEST_COUNTRY_NAME/_committed_129582346818394882,_committed_129582346818394882,113
dbfs:/mnt/data/data/gprby_DEST_COUNTRY_NAME/_started_129582346818394882,_started_129582346818394882,0
dbfs:/mnt/data/data/gprby_DEST_COUNTRY_NAME/part-00000-tid-129582346818394882-ca1469c7-b336-43e5-a27e-395ba45a0ccb-9312-1-c000.csv,part-00000-tid-129582346818394882-ca1469c7-b336-43e5-a27e-395ba45a0ccb-9312-1-c000.csv,2693


## Grouping on repartitioned dataframe and inspect performance

In [0]:
# No/Minimal shuffling will happen
flightDataDF2015_rp_groupBy=flightDataDF2015_rp\
.groupBy("dest_country_name")\
.sum("count")\
.withColumnRenamed("sum(count)", "destination_total")

In [0]:
dbutils.fs.rm("/mnt/data/data/gprby2_DEST_COUNTRY_NAME",True)

In [0]:
# Here the RDD Action will trigger and notice that multiple files will be saved, one file each partition
flightDataDF2015_rp_groupBy.write.mode('overwrite').format("csv").option("header", "true").save("dbfs:/mnt/data/data/gprby2_DEST_COUNTRY_NAME")

In [0]:
%fs
ls /mnt/data/data/gprby2_DEST_COUNTRY_NAME

path,name,size
dbfs:/mnt/data/data/gprby2_DEST_COUNTRY_NAME/_SUCCESS,_SUCCESS,0
dbfs:/mnt/data/data/gprby2_DEST_COUNTRY_NAME/_committed_5078219582153179229,_committed_5078219582153179229,7134
dbfs:/mnt/data/data/gprby2_DEST_COUNTRY_NAME/_started_5078219582153179229,_started_5078219582153179229,0
dbfs:/mnt/data/data/gprby2_DEST_COUNTRY_NAME/part-00000-tid-5078219582153179229-392929a6-e40c-4861-b466-8841a64548e4-9316-1-c000.csv,part-00000-tid-5078219582153179229-392929a6-e40c-4861-b466-8841a64548e4-9316-1-c000.csv,66
dbfs:/mnt/data/data/gprby2_DEST_COUNTRY_NAME/part-00001-tid-5078219582153179229-392929a6-e40c-4861-b466-8841a64548e4-9317-1-c000.csv,part-00001-tid-5078219582153179229-392929a6-e40c-4861-b466-8841a64548e4-9317-1-c000.csv,62
dbfs:/mnt/data/data/gprby2_DEST_COUNTRY_NAME/part-00002-tid-5078219582153179229-392929a6-e40c-4861-b466-8841a64548e4-9318-1-c000.csv,part-00002-tid-5078219582153179229-392929a6-e40c-4861-b466-8841a64548e4-9318-1-c000.csv,79
dbfs:/mnt/data/data/gprby2_DEST_COUNTRY_NAME/part-00003-tid-5078219582153179229-392929a6-e40c-4861-b466-8841a64548e4-9319-1-c000.csv,part-00003-tid-5078219582153179229-392929a6-e40c-4861-b466-8841a64548e4-9319-1-c000.csv,66
dbfs:/mnt/data/data/gprby2_DEST_COUNTRY_NAME/part-00004-tid-5078219582153179229-392929a6-e40c-4861-b466-8841a64548e4-9320-1-c000.csv,part-00004-tid-5078219582153179229-392929a6-e40c-4861-b466-8841a64548e4-9320-1-c000.csv,86
dbfs:/mnt/data/data/gprby2_DEST_COUNTRY_NAME/part-00006-tid-5078219582153179229-392929a6-e40c-4861-b466-8841a64548e4-9322-1-c000.csv,part-00006-tid-5078219582153179229-392929a6-e40c-4861-b466-8841a64548e4-9322-1-c000.csv,47
dbfs:/mnt/data/data/gprby2_DEST_COUNTRY_NAME/part-00007-tid-5078219582153179229-392929a6-e40c-4861-b466-8841a64548e4-9323-1-c000.csv,part-00007-tid-5078219582153179229-392929a6-e40c-4861-b466-8841a64548e4-9323-1-c000.csv,64


## Dealing with Manual / Static Schema

In [0]:
staticSchema = flightDataDF2015.schema

In [0]:
print(staticSchema)

In [0]:
flightDataDF2015 = spark\
.read\
.schema(staticSchema)\
.option("header", "true")\
.csv(flightData2015)

In [0]:
flightDataDF2015.display()

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count,Year,partitionID
United States,Romania,4,679,
United States,Ireland,3,679,
United States,India,3,679,
Egypt,United States,1,679,
Equatorial Guinea,United States,3,679,
United States,Singapore,3,679,
United States,Grenada,4,679,
Costa Rica,United States,4,679,
Senegal,United States,3,679,
United States,Marshall Islands,0,679,


In [0]:
from pyspark.sql.types import StructField, StructType, StringType, LongType

In [0]:
myManualSchema = StructType([\
                            StructField("DEST_COUNTRY_NAME", StringType(), nullable=False),\
                            StructField("ORIGIN_COUNTRY_NAME", StringType(), nullable=False),\
                            StructField("count", StringType(), nullable=False, metadata={"hello":"world"})\
                            ])
df1 = spark.read.format("csv").schema(myManualSchema)\
.load(flightData2015)


In [0]:
df1.display()

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,4
United States,Ireland,3
United States,India,3
Egypt,United States,1
Equatorial Guinea,United States,3
United States,Singapore,3
United States,Grenada,4
Costa Rica,United States,4
Senegal,United States,3


In [0]:
flightDataDF2015.rdd.getNumPartitions()

## Can specify to create multiple files, one for each value in the columns

In [0]:
# Slow Performance as all data is to be reshuffled
dbutils.fs.rm("dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME", True)
flightDataDF2015.write.partitionBy("DEST_COUNTRY_NAME").format("csv").save("dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME")

In [0]:
%fs
ls /mnt/data/data/P_By_DEST_COUNTRY_NAME

path,name,size
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME/DEST_COUNTRY_NAME=Afghanistan/,DEST_COUNTRY_NAME=Afghanistan/,0
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME/DEST_COUNTRY_NAME=Algeria/,DEST_COUNTRY_NAME=Algeria/,0
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME/DEST_COUNTRY_NAME=Angola/,DEST_COUNTRY_NAME=Angola/,0
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME/DEST_COUNTRY_NAME=Anguilla/,DEST_COUNTRY_NAME=Anguilla/,0
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME/DEST_COUNTRY_NAME=Antigua and Barbuda/,DEST_COUNTRY_NAME=Antigua and Barbuda/,0
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME/DEST_COUNTRY_NAME=Argentina/,DEST_COUNTRY_NAME=Argentina/,0
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME/DEST_COUNTRY_NAME=Aruba/,DEST_COUNTRY_NAME=Aruba/,0
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME/DEST_COUNTRY_NAME=Australia/,DEST_COUNTRY_NAME=Australia/,0
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME/DEST_COUNTRY_NAME=Austria/,DEST_COUNTRY_NAME=Austria/,0
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME/DEST_COUNTRY_NAME=Azerbaijan/,DEST_COUNTRY_NAME=Azerbaijan/,0


In [0]:
# Fast Performance as all data is spread across partitions with each executer
dbutils.fs.rm("dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME_2", True)
flightDataDF2015_rp.write.partitionBy("DEST_COUNTRY_NAME").format("csv").save("dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME_2")

In [0]:
%fs
ls /mnt/data/data/P_By_DEST_COUNTRY_NAME_2

path,name,size
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME_2/DEST_COUNTRY_NAME=Afghanistan/,DEST_COUNTRY_NAME=Afghanistan/,0
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME_2/DEST_COUNTRY_NAME=Algeria/,DEST_COUNTRY_NAME=Algeria/,0
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME_2/DEST_COUNTRY_NAME=Angola/,DEST_COUNTRY_NAME=Angola/,0
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME_2/DEST_COUNTRY_NAME=Anguilla/,DEST_COUNTRY_NAME=Anguilla/,0
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME_2/DEST_COUNTRY_NAME=Antigua and Barbuda/,DEST_COUNTRY_NAME=Antigua and Barbuda/,0
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME_2/DEST_COUNTRY_NAME=Argentina/,DEST_COUNTRY_NAME=Argentina/,0
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME_2/DEST_COUNTRY_NAME=Aruba/,DEST_COUNTRY_NAME=Aruba/,0
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME_2/DEST_COUNTRY_NAME=Australia/,DEST_COUNTRY_NAME=Australia/,0
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME_2/DEST_COUNTRY_NAME=Austria/,DEST_COUNTRY_NAME=Austria/,0
dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME_2/DEST_COUNTRY_NAME=Azerbaijan/,DEST_COUNTRY_NAME=Azerbaijan/,0


In [0]:
# Multiple executers will load the data paralally
df1 = spark.read.format("csv").schema(myManualSchema)\
.load("dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME_2")

In [0]:
# Multiple executers will load the data paralally
df1 = spark.read.format("csv").schema(myManualSchema)\
.load("dbfs:/mnt/data/data/P_By_DEST_COUNTRY_NAME_2")

In [0]:
df1.createOrReplaceTempView("flight_data_2015")
sqlWay = spark\
.sql("""
select *
from flight_data_2015
where dest_country_name='United States'
""")

In [0]:
sqlWay.show(10000000)

In [0]:
# Notice the number of partitions created
df1.rdd.getNumPartitions()

In [0]:
df1.display()

ORIGIN_COUNTRY_NAME,count,DEST_COUNTRY_NAME
Romania,4,United States
Ireland,3,United States
India,3,United States
Singapore,3,United States
Grenada,4,United States
Marshall Islands,0,United States
Sint Maarten,1,United States
Afghanistan,5,United States
Russia,5,United States
Federated States of Micronesia,1,United States
