## Exploring available datasets

In [0]:
dbutils.fs.ls("/databricks-datasets")

[FileInfo(path='dbfs:/databricks-datasets/COVID/', name='COVID/', size=0, modificationTime=1701096739789),
 FileInfo(path='dbfs:/databricks-datasets/README.md', name='README.md', size=976, modificationTime=1532502324000),
 FileInfo(path='dbfs:/databricks-datasets/Rdatasets/', name='Rdatasets/', size=0, modificationTime=1701096739789),
 FileInfo(path='dbfs:/databricks-datasets/SPARK_README.md', name='SPARK_README.md', size=3359, modificationTime=1455505834000),
 FileInfo(path='dbfs:/databricks-datasets/adult/', name='adult/', size=0, modificationTime=1701096739789),
 FileInfo(path='dbfs:/databricks-datasets/airlines/', name='airlines/', size=0, modificationTime=1701096739789),
 FileInfo(path='dbfs:/databricks-datasets/amazon/', name='amazon/', size=0, modificationTime=1701096739789),
 FileInfo(path='dbfs:/databricks-datasets/asa/', name='asa/', size=0, modificationTime=1701096739789),
 FileInfo(path='dbfs:/databricks-datasets/atlas_higgs/', name='atlas_higgs/', size=0, modificationTime=

## Reading data from data source

In [0]:
from pyspark.sql import functions as F

In [0]:
file_location = "dbfs:/databricks-datasets/flights/"
file_type = "csv" 

df = spark.read.format(file_type) \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .load(file_location)

df.show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
|01061215|   -6|     602|   ABE|        ATL|
|01061725|   69|     602|   ABE|        ATL|
|01061230|    0|     369|   ABE|        DTW|
|01060625|   -3|     602|   ABE|        ATL|
|01070600|    0|     369|   ABE|        DTW|
|01071725|    0|     602|   ABE|        ATL|
|01071230|    0|     369|   ABE|        DTW|
|01070625|    0|     602|   ABE|        ATL|
|01071219|    0|     569|   ABE|        ORD|
|01080600|

## Transforming the data

In [0]:
less_than_500_miles = df.filter(df["distance"] < 500)
less_than_500_miles.show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01020600|   -8|     369|   ABE|        DTW|
|01061230|    0|     369|   ABE|        DTW|
|01070600|    0|     369|   ABE|        DTW|
|01071230|    0|     369|   ABE|        DTW|
|01080600|    0|     369|   ABE|        DTW|
|01081230|   33|     369|   ABE|        DTW|
|01090600|  151|     369|   ABE|        DTW|
|01091230|   -4|     369|   ABE|        DTW|
|01100600|   -5|     369|   ABE|        DTW|
|01101230|   -8|     369|   ABE|        DTW|
|01110600|   -9|     369|   ABE|        DTW|
|01130600|   -7|     369|   ABE|        DTW|
|01131230|  -13|     369|   ABE|        DTW|
|01140600|   -9|     369|   ABE|        DTW|
|01141230|   -8|     369|   ABE|        DTW|
|01150600|    0|     369|   ABE|        DTW|
|01151230|    0|     369|   ABE|        DTW|
|01160600|   -1|     369|   ABE|        DTW|
|01161230|   -7|     369|   ABE|        DTW|
|01170600|

In [0]:
avg_delay_less_than_500 = df.groupBy("origin").agg(F.round(F.avg("delay"), 2).alias("average_delay"))
avg_delay_less_than_500.show()

+------+-------------+
|origin|average_delay|
+------+-------------+
|   MSY|        11.81|
|   SNA|         8.64|
|   PSG|        -0.25|
|   MYR|         7.86|
|   PVD|        10.63|
|   OAK|        12.79|
|   MQT|        23.87|
|   MSN|         9.36|
|   SCC|        -1.52|
|   MLU|         7.91|
|   WRG|         -2.5|
|   LEX|        13.84|
|   RDM|         4.34|
|   ORF|         12.9|
|   SCE|        17.92|
|   SAV|        12.74|
|   TRI|         6.67|
|   MOD|         9.93|
|   TYR|         3.57|
|   MOB|        11.69|
+------+-------------+
only showing top 20 rows



## Writing to data sink

In [0]:
%fs mkdirs /mnt/de-mini-project-11

In [0]:
avg_delay_less_than_500.write.parquet("/mnt/de-mini-project-11/avg_delay_less_than_500")

## Checking if it is written

In [0]:
%fs ls /mnt/de-mini-project-11

path,name,size,modificationTime
dbfs:/mnt/de-mini-project-11/avg_delay_less_than_500/,avg_delay_less_than_500/,0,1701099759000


In [0]:
data_path = "/mnt/de-mini-project-11/avg_delay_less_than_500/"
df = spark.read.format("parquet").load(data_path)
df.show()

+------+-------------+
|origin|average_delay|
+------+-------------+
|   MSY|        11.81|
|   SNA|         8.64|
|   PSG|        -0.25|
|   MYR|         7.86|
|   PVD|        10.63|
|   OAK|        12.79|
|   MQT|        23.87|
|   MSN|         9.36|
|   SCC|        -1.52|
|   MLU|         7.91|
|   WRG|         -2.5|
|   LEX|        13.84|
|   RDM|         4.34|
|   ORF|         12.9|
|   SCE|        17.92|
|   SAV|        12.74|
|   TRI|         6.67|
|   MOD|         9.93|
|   TYR|         3.57|
|   MOB|        11.69|
+------+-------------+
only showing top 20 rows

