
## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [0]:
# File location and type
flight_file_location = "/FileStore/tables/flightData.csv"
pass_file_location = "/FileStore/tables/passengers.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df_flight = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(flight_file_location)


df_pass = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(pass_file_location)



display(df_flight)

passengerId,flightId,from,to,date
48,0,cg,ir,2017-01-01
94,0,cg,ir,2017-01-01
82,0,cg,ir,2017-01-01
21,0,cg,ir,2017-01-01
51,0,cg,ir,2017-01-01
33,0,cg,ir,2017-01-01
20,0,cg,ir,2017-01-01
10,0,cg,ir,2017-01-01
49,0,cg,ir,2017-01-01
32,0,cg,ir,2017-01-01


###Rename columns of flight dataset

In [0]:
df_flight = df_flight.withColumnRenamed("date","journey_date").withColumnRenamed("passengerId","pid").withColumnRenamed("flightid","fid")
df_flight.show(10)

+---+---+----+---+------------+
|pid|fid|from| to|journey_date|
+---+---+----+---+------------+
| 48|  0|  cg| ir|  2017-01-01|
| 94|  0|  cg| ir|  2017-01-01|
| 82|  0|  cg| ir|  2017-01-01|
| 21|  0|  cg| ir|  2017-01-01|
| 51|  0|  cg| ir|  2017-01-01|
| 33|  0|  cg| ir|  2017-01-01|
| 20|  0|  cg| ir|  2017-01-01|
| 10|  0|  cg| ir|  2017-01-01|
| 49|  0|  cg| ir|  2017-01-01|
| 32|  0|  cg| ir|  2017-01-01|
+---+---+----+---+------------+
only showing top 10 rows



In [0]:
# Create a view or table

temp_table_name = "flightdata"

df_flight.createOrReplaceTempView(temp_table_name)

In [0]:
# %sql

#Query the created temp table in a SQL cell */

# select * from `flightData`*/

In [0]:
import pyspark.sql.functions as sqlf
# df_flight.show(10)
df_flight2 = df_flight.withColumn("journey_month",sqlf.lit(sqlf.month('journey_date'))).drop("journey_date")
df_flight2.printSchema()

root
 |-- pid: string (nullable = true)
 |-- fid: string (nullable = true)
 |-- from: string (nullable = true)
 |-- to: string (nullable = true)
 |-- journey_month: integer (nullable = true)



In [0]:
# df_flight2.groupBy("fid","journey_month").count().show()
df_flight2.groupBy("journey_month").count()\
    .withColumnRenamed("journey_month","Month").\
        withColumnRenamed("count","Number of Flights")\
            .orderBy("journey_month")\
                .show()

# df_flight2.orderBy("journey_month").show()


+-----+-----------------+
|Month|Number of Flights|
+-----+-----------------+
|    1|             9700|
|    2|             7300|
|    3|             8200|
|    4|             9200|
|    5|             9200|
|    6|             7100|
|    7|             8700|
|    8|             7600|
|    9|             8500|
|   10|             7600|
|   11|             7500|
|   12|             9400|
+-----+-----------------+



### Rename passenger dataset columns

In [0]:
# df_pass.printSchema()

df_pass = df_pass.withColumnRenamed("passengerId","pid")\
.withColumnRenamed("firstName","fname")\
.withColumnRenamed("lastName","lname")

# df_pass.count()
df_pass.show(10)

+-----+---------+--------+
|  pid|    fname|   lname|
+-----+---------+--------+
|14751| Napoleon| Gaylene|
| 2359| Katherin| Shanell|
| 5872|   Stevie|  Steven|
| 3346|Margarita|   Gerri|
| 3704|    Earle|  Candis|
| 1226|    Trent|    Omer|
| 2677|    Janee|  Lillia|
|  179|     Gita|Chastity|
| 9763|   Hilton|Jaquelyn|
|11414|      Leo|Margaret|
+-----+---------+--------+
only showing top 10 rows



In [0]:
# df_pass.count() ##15500
# df_pass.select("fname","lname").distinct().count() #15496
# df_pass.orderBy("fname","lname").show()

# df_stage = df_flight2.join(df_pass, df_pass.pid == df_flight2.pid, "inner") \
#      .select("df_flight2.*","df_pass.fname")
# df_stage.show()

df_stage = df_flight2.join(df_pass, df_pass.pid == df_flight2.pid, "inner").select(df_flight2["*"],df_pass["lname"])

df_stage.show()

+---+---+----+---+-------------+---------+
|pid|fid|from| to|journey_month|    lname|
+---+---+----+---+-------------+---------+
| 48|  0|  cg| ir|            1|    Annis|
| 94|  0|  cg| ir|            1|     Mina|
| 82|  0|  cg| ir|            1|      Jed|
| 21|  0|  cg| ir|            1|   Dorsey|
| 51|  0|  cg| ir|            1|  Kaleigh|
| 33|  0|  cg| ir|            1|    Elmer|
| 20|  0|  cg| ir|            1|Gabriella|
| 10|  0|  cg| ir|            1|Guillermo|
| 49|  0|  cg| ir|            1|     Kimi|
| 32|  0|  cg| ir|            1| Adrianna|
| 70|  0|  cg| ir|            1|   Walter|
| 28|  0|  cg| ir|            1|    Milan|
| 42|  0|  cg| ir|            1|    Sheri|
| 62|  0|  cg| ir|            1|  Brianne|
| 80|  0|  cg| ir|            1|  Joselyn|
| 13|  0|  cg| ir|            1|   Remona|
| 46|  0|  cg| ir|            1|  Michiko|
| 43|  0|  cg| ir|            1|    Paula|
| 17|  0|  cg| ir|            1|  Ellamae|
| 16|  0|  cg| ir|            1|  Tamekia|
+---+---+--

In [0]:
df_stage

In [0]:
import pyspark.sql.functions as fnsql #row_number

df_stage.select("*").groupBy("pid") \
    .agg(fnsql.count("pid").alias("countx"))\
        .filter(fnsql.col("countx") >20).show()



+----+------+
| pid|countx|
+----+------+
|4249|    21|
| 613|    23|
|9441|    23|
|5096|    25|
|3078|    22|
|1673|    23|
|2185|    21|
|2926|    21|
|7643|    23|
| 258|    22|
|1651|    21|
|3405|    23|
| 917|    25|
|1982|    21|
|1598|    23|
|8961|    26|
|8411|    23|
|4399|    21|
|2437|    22|
|2251|    23|
+----+------+
only showing top 20 rows



In [0]:
# With this registered as a temp view, it will only be available to this particular notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.
# Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.
# To do so, choose your table name and uncomment the bottom line.

permanent_table_name = "flightData_csv"

# df.write.format("parquet").saveAsTable(permanent_table_name)