## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [2]:
# File location and type
file_location = "/FileStore/tables/2015_summary-ebaee.json"
file_type = "json"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15
United States,Croatia,1
United States,Ireland,344
Egypt,United States,15
United States,India,62
United States,Singapore,1
United States,Grenada,62
Costa Rica,United States,588
Senegal,United States,40
Moldova,United States,1


In [3]:
df.printSchema()

In [4]:
from pyspark.sql.functions import col, column
col("someColumnName")
column("someColumnName")

In [5]:
from pyspark.sql.functions import expr
expr("(((someCol + 5) * 200) - 6) < otherCol")

In [6]:
# in Python
from pyspark.sql import Row
myRow = Row("Hello", None, 1, False)
myRow[0]

In [7]:
# in Python
# df = spark.read.format("json").load("/data/flight-data/json/2015-summary.json")
df.createOrReplaceTempView("dfTable")

In [8]:
# in Python
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType
myManualSchema = StructType([
StructField("some", StringType(), True),
StructField("col", StringType(), True),
StructField("names", LongType(), False)
])
myRow = Row("Hello", None, 1)
myDf = spark.createDataFrame([myRow], myManualSchema)
myDf.show()

In [9]:
#select and selectExpr
# in Python
df.select("DEST_COUNTRY_NAME").show(2)

In [10]:
%sql #-- in SQL
SELECT DEST_COUNTRY_NAME FROM dfTable LIMIT 2

DEST_COUNTRY_NAME
United States
United States


In [11]:
# in Python
from pyspark.sql.functions import expr, col, column
df.select(
expr("DEST_COUNTRY_NAME"),
col("DEST_COUNTRY_NAME"),
column("DEST_COUNTRY_NAME"))\
.show(2)

In [12]:
df.selectExpr(
"*", # all original columns
"(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry")\
.show(2)

In [13]:
df.show()

In [14]:
df.selectExpr("DEST_COUNTRY_NAME as newColumnName", "DEST_COUNTRY_NAME").show(2)

In [15]:
# in Python
from pyspark.sql.functions import lit
df2 = df.select(expr("*"), lit(1).alias("One"))

In [16]:
df2.printSchema()

In [17]:
df.show(2)

In [18]:
dfWithLongColName.columns

In [19]:
dfWithLongColName = df.withColumn(
"short",
expr("ORIGIN_COUNTRY_NAME"))
dfWithLongColName.select('short', 'ORIGIN_COUNTRY_NAME').show(2)

In [20]:
dfWithLongColName.selectExpr(
"`This Long Column-Name`",
"`This Long Column-Name` as `new col`")\
.show(2)

In [21]:
df.filter(col("count") < 2).show(2)

In [22]:
df.where(col('count') < 2).show(2)

In [23]:
# in Python
seed = 551
withReplacement = False
fraction = 0.5
df.sample(withReplacement, fraction, seed).count()

In [24]:
# in Python
dataFrames = df.randomSplit([1.0, 3.0], seed)
dataFrames[0].count()

In [25]:
schema

In [26]:
schema = df.schema


In [27]:
newRows = [
Row("New Country", "Other Country", 5),
Row("New Country 2", "Other Country 3", 1)
]

In [28]:
parallelizedRows = spark.sparkContext.parallelize(newRows,400)
newDF = spark.createDataFrame(parallelizedRows, schema)
newDF.rdd.getNumPartitions()

In [29]:
spark.createDataFrame(newRows, schema).rdd.getNumPartitions()

In [30]:
df = df.repartition(5, col("DEST_COUNTRY_NAME"))

In [31]:
df.rdd.getNumPartitions()

In [32]:
df = df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)

In [33]:
from pyspark.sql import DataFrame
from functools import reduce

surnames_part = reduce(DataFrame.unionAll, [df, newDF])
surnames_part

In [34]:
# in Python
df.union(newDF)\
.where("count = 1")\
.where(col("ORIGIN_COUNTRY_NAME") != "United States")\
.show()

In [35]:
(
df.union(spark.createDataFrame(newRows, schema))
.where("count = 1")
.where(col("ORIGIN_COUNTRY_NAME") != "United States")
.show()
)

In [36]:
# in Python
df2 = df.union(newDF)\
.where("count = 1")\
.where(col("ORIGIN_COUNTRY_NAME") != "United States")

In [37]:
df2.collect()

In [38]:
from pyspark.sql.functions import desc, asc
df.orderBy(expr("count desc")).show(2)
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)

In [39]:
from pyspark.sql.functions import desc, asc
df.orderBy("count").show(2)

In [40]:
# in Python
collectDF = df.limit(10)

In [41]:
collectDF.head()

In [42]:
collectDF.take(5) # take works with an Integer count

In [43]:
collectDF.show(5, False)
# collectDF.collect()

In [44]:
collectDF.limit(5).show()
# collectDF.collect()