In [0]:
#Reading sample doc
display(dbutils.fs.ls("/databricks-datasets/samples/docs/"))

path,name,size
dbfs:/databricks-datasets/samples/docs/README.md,README.md,3137


In [0]:
#assign data to variable "TextFile"
TextFile = spark.read.text("/databricks-datasets/samples/docs/README.md")

In [0]:
#counting lines in README.md = 65
TextFile.count()

In [0]:
%scala
val lines = sc.textFile("/databricks-datasets/samples/docs/README.md")
val counts = lines.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)
/* MapReduce and Word counting using SCALA */

In [0]:
%scala
counts.collect

In [0]:
# to review the jobs done by spark click spark jobs then view to check the task duration and the DAG visualization

In [0]:
# Use the Spark CSV datasource with options specifying:
# - First line of file is a header
# - Automatically infer the schema of the data to assign data type to each column 
# assigning data to 'data' variable
data = spark.read.format("csv") \
  .option("header", "true") \
  .option("inferSchema", "true") \
  .load("/databricks-datasets/samples/population-vs-price/data_geo.csv")

In [0]:
data.count()

In [0]:
data.cache() # Cache data for faster reuse
data = data.dropna() # drop rows with missing values
data.count()

In [0]:
#first 10 rows
data.take(10)

In [0]:
#view data in tabular format 
display(data)

2014 rank,City,State,State Code,2014 Population estimate,2015 median sales price
101,Birmingham,Alabama,AL,212247,162.9
125,Huntsville,Alabama,AL,188226,157.7
122,Mobile,Alabama,AL,194675,122.5
114,Montgomery,Alabama,AL,200481,129.0
6,Phoenix,Arizona,AZ,1537058,206.1
33,Tucson,Arizona,AZ,527972,178.1
119,Little Rock,Arkansas,AR,197706,131.8
56,Anaheim,California,CA,346997,685.7
2,Los Angeles,California,CA,3928864,434.7
59,Riverside,California,CA,319504,281.0


In [0]:
#creaate temp table to query data
data.createOrReplaceTempView("data_geo")

In [0]:
%sql
select `State Code`, `2015 median sales price` from data_geo

State Code,2015 median sales price
AL,162.9
AL,157.7
AL,122.5
AL,129.0
AZ,206.1
AZ,178.1
AR,131.8
CA,685.7
CA,434.7
CA,281.0


In [0]:
%sql
select City, `2014 Population estimate` from data_geo where `State Code` = 'WA';


City,2014 Population estimate
Seattle,668342
Spokane,212052


In [0]:
%sql
select `State Code`, sum(`2015 median sales price`) as `Sum Sales` from data_geo group by `State Code`

State Code,Sum Sales
AZ,384.2
LA,328.79999999999995
MN,209.4
NJ,350.8
DC,367.8
OR,694.6
VA,402.5
RI,233.3
KY,282.29999999999995
NH,237.4


In [0]:
%sql
select City, `2014 Population estimate`/1000 as `2014 Population Estimate (1000s)` from data_geo order by `2015 median sales price` desc limit 10

City,2014 Population Estimate (1000s)
San Jose,1015.785
San Francisco[10],852.469
Honolulu[2],350.399
Anaheim,346.997
San Diego,1381.069
Boulder,105.112
Los Angeles,3928.864
New York[6],8491.079
Boston,655.884
Washington[13],658.893


In [0]:
%scala
val range100 = spark.range(100)
range100.collect()
/*generate numbers from 0-99, collect() execute the job*/

In [0]:
%scala
val df = spark.read.json("/databricks-datasets/samples/people/people.json")
/*read.json automatically turns data to dataframe*/

In [0]:
%scala
/*assign data types manually*/
case class Person (name: String, age: Long)
val ds = spark.read.json("/databricks-datasets/samples/people/people.json").as[Person]
/*maps columns in data to datatypes from class*/

In [0]:
%scala
case class DeviceIoTData (
  battery_level: Long,
  c02_level: Long,
  cca2: String,
  cca3: String,
  cn: String,
  device_id: Long,
  device_name: String,
  humidity: Long,
  ip: String,
  latitude: Double,
  longitude: Double,
  scale: String,
  temp: Long,
  timestamp: Long
)
val ds = spark.read.json("/databricks-datasets/iot/iot_devices.json").as[DeviceIoTData]


In [0]:
%scala
display(ds)


battery_level,c02_level,cca2,cca3,cn,device_id,device_name,humidity,ip,latitude,lcd,longitude,scale,temp,timestamp
8,868,US,USA,United States,1,meter-gauge-1xbYRYcj,51,68.161.225.1,38.0,green,-97.0,Celsius,34,1458444054093
7,1473,NO,NOR,Norway,2,sensor-pad-2n2Pea,70,213.161.254.1,62.47,red,6.15,Celsius,11,1458444054119
2,1556,IT,ITA,Italy,3,device-mac-36TWSKiT,44,88.36.5.1,42.83,red,12.83,Celsius,19,1458444054120
6,1080,US,USA,United States,4,sensor-pad-4mzWkz,32,66.39.173.154,44.06,yellow,-121.32,Celsius,28,1458444054121
4,931,PH,PHL,Philippines,5,therm-stick-5gimpUrBB,62,203.82.41.9,14.58,green,120.97,Celsius,25,1458444054122
3,1210,US,USA,United States,6,sensor-pad-6al7RTAobR,51,204.116.105.67,35.93,yellow,-85.46,Celsius,27,1458444054122
3,1129,CN,CHN,China,7,meter-gauge-7GeDoanM,26,220.173.179.1,22.82,yellow,108.32,Celsius,18,1458444054123
0,1536,JP,JPN,Japan,8,sensor-pad-8xUD6pzsQI,35,210.173.177.1,35.69,red,139.69,Celsius,27,1458444054123
3,807,JP,JPN,Japan,9,device-mac-9GcjZ2pw,85,118.23.68.227,35.69,green,139.69,Celsius,13,1458444054124
7,1470,US,USA,United States,10,sensor-pad-10BsywSYUF,56,208.109.163.218,33.61,red,-111.89,Celsius,26,1458444054125


In [0]:
%scala
ds.take(10).foreach(println(_))
/* top ten rows*/

In [0]:
%scala
val sorted_device = ds.select($"battery_level", $"c02_level", $"device_name").where($"battery_level" > 6).sort($"c02_level")
display(sorted_device)

battery_level,c02_level,device_name
7,800,sensor-pad-1864985T2DlE87SJ
8,800,sensor-pad-151562COzl8oo
7,800,sensor-pad-101310Zyq9uy
9,800,sensor-pad-128292V6kgzx1j0B
8,800,device-mac-218672RVpKvk
9,800,sensor-pad-1116964vFUoUwg
8,800,meter-gauge-512894c2QTl
8,800,meter-gauge-60509Mzj9YApS
9,800,meter-gauge-20267DRcaY7
7,800,sensor-pad-134798HMJ61LKz


In [0]:
%scala
ds.createOrReplaceTempView("iot_device_data")

In [0]:
%sql 
select cca3, count (distinct device_id) as device_id from iot_device_data group by cca3 order by device_id desc limit 100


cca3,device_id
USA,70405
CHN,14455
JPN,12100
KOR,11879
DEU,7942
GBR,6486
CAN,6041
RUS,5989
FRA,5305
BRA,3224


In [0]:
%scala
ds.write.saveAsTable("iot_device_data2")
/*save data into table*/

In [0]:
%scala
ds.write.mode("overwrite").parquet("/tmp/testParquet")
/*save as parquet format*/

In [0]:
# list of directory where the parquet file got saved
display(dbutils.fs.ls("/tmp/testParquet"))

path,name,size
dbfs:/tmp/testParquet/_SUCCESS,_SUCCESS,0
dbfs:/tmp/testParquet/_committed_3958592101092948094,_committed_3958592101092948094,816
dbfs:/tmp/testParquet/_started_3958592101092948094,_started_3958592101092948094,0
dbfs:/tmp/testParquet/part-00000-tid-3958592101092948094-8b065b8a-be0f-4ff8-a3bd-3a1030dc5c74-87-1-c000.snappy.parquet,part-00000-tid-3958592101092948094-8b065b8a-be0f-4ff8-a3bd-3a1030dc5c74-87-1-c000.snappy.parquet,1049545
dbfs:/tmp/testParquet/part-00001-tid-3958592101092948094-8b065b8a-be0f-4ff8-a3bd-3a1030dc5c74-88-1-c000.snappy.parquet,part-00001-tid-3958592101092948094-8b065b8a-be0f-4ff8-a3bd-3a1030dc5c74-88-1-c000.snappy.parquet,1030971
dbfs:/tmp/testParquet/part-00002-tid-3958592101092948094-8b065b8a-be0f-4ff8-a3bd-3a1030dc5c74-89-1-c000.snappy.parquet,part-00002-tid-3958592101092948094-8b065b8a-be0f-4ff8-a3bd-3a1030dc5c74-89-1-c000.snappy.parquet,1030243
dbfs:/tmp/testParquet/part-00003-tid-3958592101092948094-8b065b8a-be0f-4ff8-a3bd-3a1030dc5c74-90-1-c000.snappy.parquet,part-00003-tid-3958592101092948094-8b065b8a-be0f-4ff8-a3bd-3a1030dc5c74-90-1-c000.snappy.parquet,1027643
dbfs:/tmp/testParquet/part-00004-tid-3958592101092948094-8b065b8a-be0f-4ff8-a3bd-3a1030dc5c74-91-1-c000.snappy.parquet,part-00004-tid-3958592101092948094-8b065b8a-be0f-4ff8-a3bd-3a1030dc5c74-91-1-c000.snappy.parquet,1024038
dbfs:/tmp/testParquet/part-00005-tid-3958592101092948094-8b065b8a-be0f-4ff8-a3bd-3a1030dc5c74-92-1-c000.snappy.parquet,part-00005-tid-3958592101092948094-8b065b8a-be0f-4ff8-a3bd-3a1030dc5c74-92-1-c000.snappy.parquet,1024935
dbfs:/tmp/testParquet/part-00006-tid-3958592101092948094-8b065b8a-be0f-4ff8-a3bd-3a1030dc5c74-93-1-c000.snappy.parquet,part-00006-tid-3958592101092948094-8b065b8a-be0f-4ff8-a3bd-3a1030dc5c74-93-1-c000.snappy.parquet,1025543


In [0]:
parquet_directory = spark.read.parquet("/tmp/testParquet/")
display(parquet_directory)

battery_level,c02_level,cca2,cca3,cn,device_id,device_name,humidity,ip,latitude,lcd,longitude,scale,temp,timestamp
8,868,US,USA,United States,1,meter-gauge-1xbYRYcj,51,68.161.225.1,38.0,green,-97.0,Celsius,34,1458444054093
7,1473,NO,NOR,Norway,2,sensor-pad-2n2Pea,70,213.161.254.1,62.47,red,6.15,Celsius,11,1458444054119
2,1556,IT,ITA,Italy,3,device-mac-36TWSKiT,44,88.36.5.1,42.83,red,12.83,Celsius,19,1458444054120
6,1080,US,USA,United States,4,sensor-pad-4mzWkz,32,66.39.173.154,44.06,yellow,-121.32,Celsius,28,1458444054121
4,931,PH,PHL,Philippines,5,therm-stick-5gimpUrBB,62,203.82.41.9,14.58,green,120.97,Celsius,25,1458444054122
3,1210,US,USA,United States,6,sensor-pad-6al7RTAobR,51,204.116.105.67,35.93,yellow,-85.46,Celsius,27,1458444054122
3,1129,CN,CHN,China,7,meter-gauge-7GeDoanM,26,220.173.179.1,22.82,yellow,108.32,Celsius,18,1458444054123
0,1536,JP,JPN,Japan,8,sensor-pad-8xUD6pzsQI,35,210.173.177.1,35.69,red,139.69,Celsius,27,1458444054123
3,807,JP,JPN,Japan,9,device-mac-9GcjZ2pw,85,118.23.68.227,35.69,green,139.69,Celsius,13,1458444054124
7,1470,US,USA,United States,10,sensor-pad-10BsywSYUF,56,208.109.163.218,33.61,red,-111.89,Celsius,26,1458444054125


In [0]:
%scala
spark.sql("show tables").show()
/* .sql is a method allows to show tables in spark */