In [0]:
# Introduction
# Author: Martand Singh
# Gmail: martandsays@gmail.com
# Facebook: https://www.facebook.com/codemakerz
# Dataset: https://data.world/carlvlewis/u-s-weather-outliers-1964
# In this notebook - we will setup of our enviornment. we will learn:
# 1. How to create a database in dbfs(hdfs)
# 2. How to save file to database in  dbfs
# 3. How to partition your data
# 4. How to explore data table, partitions and partition data  in dbfs location
# 5. How to perform basic sql queries for data table we created 

In [0]:
# Use spark read function to read from a location. format() is use to define the file format. it can be csv, json, parquet depending on you 
# input file. option() is use to define extra options, here we are telling spark to use first row as header. load() function takes the path
# of input file.

df  = spark \
      .read \
      .format("csv") \
      .option("header", "true")\
      .load('/FileStore/tables/weather_anomalies_1964_2013.csv')

In [0]:
df.show(5) # show top 20 rows by default. you can mentioned any positive number.I am using 5.

In [0]:
# How to perform sql queries on your dataset. there are many ways:
# Way 1: we can create a local temp table & explore. But it is only for you and gets deleted after you log off the session.
# way 2: We can create a delta lake table

In [0]:
# Way 1
df.createOrReplaceTempView("tempweatherdata") 

In [0]:
%sql
SELECT * FROM tempweatherdata LIMIT 5;

date_str,degrees_from_mean,id,longitude,latitude,max_temp,min_temp,station_name,type,serialid
1977-02-19,8.61,USC00103882,-113.5472,43.7186,10.0,-12.8,GROUSE,Weak Hot,1
1977-02-19,10.74,USC00053951,-107.1097,37.7717,11.1,-8.9,HERMIT 7 ESE,Weak Hot,2
1977-02-19,20.46,USC00040379,-119.5128,37.0919,25.6,12.8,AUBERRY 2 NW,Strong Hot,3
1977-02-19,8.6,USC00020808,-109.7517,33.4783,20.0,-3.9,BLACK RIVER PUMPS,Weak Hot,4
1977-02-19,10.3,USC00042598,-115.4508,33.8089,30.6,13.9,EAGLE MTN,Weak Hot,5


In [0]:
df.printSchema() # here you can see date_str has string data type. lets change it to date type

In [0]:
#  I will create a new column same as date_str and change its data type to date & delete the old one. I am doing this to change the
# data type of date column.
df_new = df \
        .withColumn("date_new", df["date_str"].cast("date")) \
        .drop("date_str")

df_new.printSchema()
# so now my new column name is date_new. you can rename it to date_str but I am keeping it as it is.

In [0]:
# Way 2: We can store this data to our dbfs & then register it as delta table
# Step1: Export data to hdfs

df_new.write.format("delta").mode("overwrite").save("/mnt/dsfda/weather-anamolies")

In [0]:
%sql
-- Step 2: Register it as delta table
CREATE TABLE IF NOT EXISTS weather_unpart
USING DELTA LOCATION '/mnt/dsfda/weather-anamolies'

In [0]:
%sql 
SELECT * FROM weather_unpart LIMIT 5;

degrees_from_mean,id,longitude,latitude,max_temp,min_temp,station_name,type,serialid,date_new
8.61,USC00103882,-113.5472,43.7186,10.0,-12.8,GROUSE,Weak Hot,1,1977-02-19
10.74,USC00053951,-107.1097,37.7717,11.1,-8.9,HERMIT 7 ESE,Weak Hot,2,1977-02-19
20.46,USC00040379,-119.5128,37.0919,25.6,12.8,AUBERRY 2 NW,Strong Hot,3,1977-02-19
8.6,USC00020808,-109.7517,33.4783,20.0,-3.9,BLACK RIVER PUMPS,Weak Hot,4,1977-02-19
10.3,USC00042598,-115.4508,33.8089,30.6,13.9,EAGLE MTN,Weak Hot,5,1977-02-19


In [0]:
# So we saw two ways of querying your data using SQL Queries
# now we will see paritioning your data. So partitioning your data means divide your data file based on a column. generally there is no 
# defined thumb rule for this but your should choose a column as your parition key if it is being used in join or where clause frequently
# for my case we will use our date column as parition key. 


In [0]:
%sql 
-- To save our parition data & later change it to delta lake table we will create a new database. Remeber apache spark runs on top of HDFS
-- So if you are a hive developer, you will see all the queries same as hive.
CREATE DATABASE IF NOT EXISTS weather_db; 

In [0]:
# Saving data to dbfs - databricks file system. As we saw earlier our data is distributed based on dates. So it is a good practice
# to partition it using date column.
df_new.write.format("delta").partitionBy("date_new").mode("overwrite").save("/mnt/weatherdata")

In [0]:
# lets check above location to see how/where our data is stored. So you can see our data is stored in form our multiple folders 
# based on our date column. I am showing only 10 folder. but there are more. In our case we chosed date as parition key. This is just for the 
# for the sake of tutorial. You should not make too small parition also. So you can choose some other key like year, month, weekly. It all 
# depends on your use case.
!ls /dbfs/mnt/weatherdata | head -10

In [0]:
# if we explore a folder. You will see each folder has a file
!ls /dbfs/mnt/weatherdata/'date_new=1964-01-01'

In [0]:
# lets explore  file at the location /dbfs/mnt/weatherdata/'date_new=1964-01-01' location. So you will see this file has 335 records for a single date.
# so if you run filter on this date, your spark know where to look data exactly. it makes your query faster than unpartitioned data filter.

spark.read.format("delta").load("/mnt/weatherdata/date_new=1964-01-01").count()

In [0]:
# you can also see what data is available at /dbfs/mnt/weatherdata/'date_new=1964-01-01'  location. 
# this is just the data for 1964-01-01 (one day)
spark.read.format("delta").load("/mnt/weatherdata/date_new=1964-01-01").show(10)

In [0]:
%sql 
-- Here we are creating tables from our paritioned data. 
CREATE  TABLE IF NOT EXISTS weather_part
USING DELTA LOCATION '/mnt/weatherdata/'

In [0]:
%sql 
-- lets query our weatheranamolies table
select * from weather_part limit 10;

degrees_from_mean,id,longitude,latitude,max_temp,min_temp,station_name,type,serialid,date_new
-10.67,USC00293530,-108.2075,33.1975,12.8,-18.3,GILA HOT SPRINGS,Weak Cold,73837,1964-01-10
-9.96,USC00295150,-106.7611,34.7675,9.4,-17.8,LOS LUNAS 3 SSW,Weak Cold,73838,1964-01-10
-9.87,USC00429136,-113.6667,37.3522,5.0,-13.3,VEYO PWR HOUSE,Weak Cold,73839,1964-01-10
-10.9,USC00340256,-95.615,34.2208,11.7,-12.2,ANTLERS,Weak Cold,73840,1964-01-10
-8.5,USC00046635,-116.5097,33.8275,17.2,-2.2,PALM SPRINGS,Weak Cold,73841,1964-01-10
-6.86,USW00023136,-119.0833,34.2167,16.1,-0.6,CAMARILLO AP,Weak Cold,73842,1964-01-10
-9.55,USC00026865,-114.2272,33.665,13.9,-5.6,QUARTZSITE,Weak Cold,73843,1964-01-10
-8.41,USC00029287,-112.7403,33.9792,17.8,-8.3,WICKENBURG,Weak Cold,73844,1964-01-10
-11.71,USC00029334,-109.8369,32.2553,16.7,-14.4,WILLCOX,Weak Cold,73845,1964-01-10
-8.85,USW00023158,-114.7142,33.6186,15.0,-3.3,BLYTHE AP,Weak Cold,73846,1964-01-10


In [0]:
# So now we have two delta tables weather_part(partitioned) & weather_unpart (unpartitioned). So we will run the same query for both & 
# check the performance

In [0]:
%sql
select
  *
from
  weather_part
where
  date_new = '1964-01-01'

degrees_from_mean,id,longitude,latitude,max_temp,min_temp,station_name,type,serialid,date_new
14.96,USC00217460,-93.3211,46.7953,7.8,-6.1,SANDY LAKE DAM LIBBY,Weak Hot,94550,1964-01-01
8.76,USC00040379,-119.5128,37.0919,21.7,2.2,AUBERRY 2 NW,Weak Hot,94551,1964-01-01
13.48,USC00485055,-106.6375,43.7144,16.7,-2.8,KAYCEE,Weak Hot,94552,1964-01-01
14.19,USC00241169,-105.83,47.2875,11.7,-6.1,BROCKWAY 3 WSW,Weak Hot,94553,1964-01-01
-10.42,USC00164674,-91.7108,29.9592,7.8,-5.6,JEANERETTE 5 NW,Weak Cold,94554,1964-01-01
10.69,USC00267175,-114.9911,39.2764,13.9,-10.6,RUTH,Weak Hot,94555,1964-01-01
12.61,USC00215392,-93.6617,45.7533,5.6,-6.7,MILACA,Weak Hot,94556,1964-01-01
7.56,USC00020750,-110.5411,36.6778,12.8,1.1,BETATAKIN,Weak Hot,94557,1964-01-01
15.89,USC00213455,-96.9406,48.7714,3.9,-9.4,HALLOCK,Weak Hot,94558,1964-01-01
-15.89,USC00203504,-85.3828,42.3833,-3.9,-24.4,GULL LK BIOLOGICAL STN,Weak Cold,94559,1964-01-01


In [0]:
%sql
select
  *
from
  weather_unpart
where
  date_new = '1964-01-01'

degrees_from_mean,id,longitude,latitude,max_temp,min_temp,station_name,type,serialid,date_new
14.96,USC00217460,-93.3211,46.7953,7.8,-6.1,SANDY LAKE DAM LIBBY,Weak Hot,94550,1964-01-01
8.76,USC00040379,-119.5128,37.0919,21.7,2.2,AUBERRY 2 NW,Weak Hot,94551,1964-01-01
13.48,USC00485055,-106.6375,43.7144,16.7,-2.8,KAYCEE,Weak Hot,94552,1964-01-01
14.19,USC00241169,-105.83,47.2875,11.7,-6.1,BROCKWAY 3 WSW,Weak Hot,94553,1964-01-01
-10.42,USC00164674,-91.7108,29.9592,7.8,-5.6,JEANERETTE 5 NW,Weak Cold,94554,1964-01-01
10.69,USC00267175,-114.9911,39.2764,13.9,-10.6,RUTH,Weak Hot,94555,1964-01-01
12.61,USC00215392,-93.6617,45.7533,5.6,-6.7,MILACA,Weak Hot,94556,1964-01-01
7.56,USC00020750,-110.5411,36.6778,12.8,1.1,BETATAKIN,Weak Hot,94557,1964-01-01
15.89,USC00213455,-96.9406,48.7714,3.9,-9.4,HALLOCK,Weak Hot,94558,1964-01-01
-15.89,USC00203504,-85.3828,42.3833,-3.9,-24.4,GULL LK BIOLOGICAL STN,Weak Cold,94559,1964-01-01


In [0]:

%sql
select
  *
from
  tempweatherdata
where
  date_str = '1964-01-01'

date_str,degrees_from_mean,id,longitude,latitude,max_temp,min_temp,station_name,type,serialid
1964-01-01,14.96,USC00217460,-93.3211,46.7953,7.8,-6.1,SANDY LAKE DAM LIBBY,Weak Hot,94550
1964-01-01,8.76,USC00040379,-119.5128,37.0919,21.7,2.2,AUBERRY 2 NW,Weak Hot,94551
1964-01-01,13.48,USC00485055,-106.6375,43.7144,16.7,-2.8,KAYCEE,Weak Hot,94552
1964-01-01,14.19,USC00241169,-105.83,47.2875,11.7,-6.1,BROCKWAY 3 WSW,Weak Hot,94553
1964-01-01,-10.42,USC00164674,-91.7108,29.9592,7.8,-5.6,JEANERETTE 5 NW,Weak Cold,94554
1964-01-01,10.69,USC00267175,-114.9911,39.2764,13.9,-10.6,RUTH,Weak Hot,94555
1964-01-01,12.61,USC00215392,-93.6617,45.7533,5.6,-6.7,MILACA,Weak Hot,94556
1964-01-01,7.56,USC00020750,-110.5411,36.6778,12.8,1.1,BETATAKIN,Weak Hot,94557
1964-01-01,15.89,USC00213455,-96.9406,48.7714,3.9,-9.4,HALLOCK,Weak Hot,94558
1964-01-01,-15.89,USC00203504,-85.3828,42.3833,-3.9,-24.4,GULL LK BIOLOGICAL STN,Weak Cold,94559


In [0]:
# So we can see the difference. In my case
# Scenario                           Time
# ------------------------------------------
# Temp Table                         6.02
# Unpartitioned Delta  table         0.6
# Partitioned Delta table            0.32

# so now you understand the importance of paritioning data. One thing is clear if you have a very large dataset you would not like to 
# go with temp table logic.

# Thanks