<a href="https://colab.research.google.com/github/siddheshpednekar/bdapracticals_sem3/blob/main/P3readingdataset.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [None]:
# File location and type
file_location = "/FileStore/tables/iris__1_-1.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

In [None]:
# Create a view or table

temp_table_name = "iris__1_-1_csv"

df.createOrReplaceTempView(temp_table_name)

In [None]:
%sql

/* Query the created temp table in a SQL cell */

select * from `iris__1_-1_csv`

In [None]:
# With this registered as a temp view, it will only be available to this particular notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.
# Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.
# To do so, choose your table name and uncomment the bottom line.

permanent_table_name = "iris__1_-1_csv"

# df.write.format("parquet").saveAsTable(permanent_table_name)

In [None]:
#Grouping Data

In [None]:
import pandas as pd
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [None]:
df = spark.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])
df.show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



In [None]:
df.groupby('color').avg().show()

+-----+-------+-------+
|color|avg(v1)|avg(v2)|
+-----+-------+-------+
|  red|    4.8|   48.0|
| blue|    3.0|   30.0|
|black|    6.0|   60.0|
+-----+-------+-------+



In [None]:
df.groupby('fruit').avg().show()

+------+------------------+------------------+
| fruit|           avg(v1)|           avg(v2)|
+------+------------------+------------------+
|banana|3.3333333333333335|33.333333333333336|
|carrot| 4.666666666666667|46.666666666666664|
| grape|               6.0|              60.0|
+------+------------------+------------------+



In [None]:
def plus_mean(pandas_df):
    return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|black|carrot|  0| 60|
| blue|banana| -1| 20|
| blue| grape|  1| 40|
|  red|banana| -3| 10|
|  red|carrot| -1| 30|
|  red|carrot|  0| 50|
|  red|banana|  2| 70|
|  red| grape|  3| 80|
+-----+------+---+---+



In [None]:
df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|black|carrot|  0| 60|
| blue|banana| -1| 20|
| blue| grape|  1| 40|
|  red|banana| -3| 10|
|  red|carrot| -1| 30|
|  red|carrot|  0| 50|
|  red|banana|  2| 70|
|  red| grape|  3| 80|
+-----+------+---+---+



In [None]:
df.groupBy('color').mean().show()

+-----+-------+-------+
|color|avg(v1)|avg(v2)|
+-----+-------+-------+
|  red|    4.8|   48.0|
| blue|    3.0|   30.0|
|black|    6.0|   60.0|
+-----+-------+-------+



In [None]:
df.groupBy('color').count().show()

+-----+-----+
|color|count|
+-----+-----+
|  red|    5|
| blue|    2|
|black|    1|
+-----+-----+



In [None]:
df.groupBy('color').min().show()

+-----+-------+-------+
|color|min(v1)|min(v2)|
+-----+-------+-------+
|  red|      1|     10|
| blue|      2|     20|
|black|      6|     60|
+-----+-------+-------+



In [None]:
def plus_mean(pandas_df):
    return pandas_df.assign(v1=pandas_df.v1.sum())
df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|black|carrot|  6| 60|
| blue|banana|  6| 20|
| blue| grape|  6| 40|
|  red|banana| 24| 10|
|  red|carrot| 24| 30|
|  red|carrot| 24| 50|
|  red|banana| 24| 70|
|  red| grape| 24| 80|
+-----+------+---+---+



In [None]:
def plus_mean(pandas_df):
    return pandas_df.assign(v1=pandas_df.v1.max())
df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|black|carrot|  6| 60|
| blue|banana|  4| 20|
| blue| grape|  4| 40|
|  red|banana|  8| 10|
|  red|carrot|  8| 30|
|  red|carrot|  8| 50|
|  red|banana|  8| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



In [None]:
def plus_mean(pandas_df):
    return pandas_df.assign(v1=pandas_df.v1.std())
df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()

+-----+------+----+---+
|color| fruit|  v1| v2|
+-----+------+----+---+
|black|carrot|null| 60|
| blue|banana|   1| 20|
| blue| grape|   1| 40|
|  red|banana|   2| 10|
|  red|carrot|   2| 30|
|  red|carrot|   2| 50|
|  red|banana|   2| 70|
|  red| grape|   2| 80|
+-----+------+----+---+



In [None]:
df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ('time', 'id', 'v1'))

df2 = spark.createDataFrame(
    [(20000101, 1, 'x'), (20000101, 2, 'y')],
    ('time', 'id', 'v2'))

def asof_join(l, r):
    return pd.merge_asof(l, r, on='time', by='id')

df1.groupby('id').cogroup(df2.groupby('id')).applyInPandas(
    asof_join, schema='time int, id int, v1 double, v2 string').show()

+--------+---+---+---+
|    time| id| v1| v2|
+--------+---+---+---+
|20000101|  1|1.0|  x|
|20000102|  1|3.0|  x|
|20000101|  2|2.0|  y|
|20000102|  2|4.0|  y|
+--------+---+---+---+



In [None]:
df2.groupby('id').cogroup(df1.groupby('id')).applyInPandas(
    asof_join, schema='time int, id int, v1 double, v2 string').show()

+--------+---+---+---+
|    time| id| v1| v2|
+--------+---+---+---+
|20000101|  1|1.0|  x|
|20000101|  2|2.0|  y|
+--------+---+---+---+



In [None]:
# import pyspark class Row from module sql
from pyspark.sql import *

In [None]:
# Create Example Data - Departments and Employees

# Create the Departments
department= Row("id","name")
department1 = Row(id='123456', name='Computer Science')
department2 = Row(id='789012', name='Mechanical Engineering')
department3 = Row(id='345678', name='Theater and Drama')
department4 = Row(id='901234', name='Indoor Recreation')


In [None]:
# Create the Employees
Employee = Row("firstName", "lastName", "email", "salary")
employee1 = Employee('michael', 'armbrust', 'no-reply@berkeley.edu', 100000)
employee2 = Employee('xiangrui', 'meng', 'no-reply@stanford.edu', 120000)
employee3 = Employee('matei', None, 'no-reply@waterloo.edu', 140000)
employee4 = Employee(None, 'wendell', 'no-reply@berkeley.edu', 160000)
employee5 = Employee('michael', 'jackson', 'no-reply@neverla.nd', 80000)


In [None]:
# Create the DepartmentWithEmployees instances from Departments and Employees
departmentWithEmployees1 = Row(department=department1, employees=[employee1, employee2])
departmentWithEmployees2 = Row(department=department2, employees=[employee3, employee4])
departmentWithEmployees3 = Row(department=department3, employees=[employee5, employee4])
departmentWithEmployees4 = Row(department=department4, employees=[employee2, employee3])

print(department1)
print(employee2)
print(departmentWithEmployees1.employees[0].email)

Row(id='123456', name='Computer Science')
Row(firstName='xiangrui', lastName='meng', email='no-reply@stanford.edu', salary=120000)
no-reply@berkeley.edu


In [None]:
print(departmentWithEmployees1.employees[0].salary)

100000


In [None]:
print(Employee)

<Row('firstName', 'lastName', 'email', 'salary')>


In [None]:
seq1 = [employee1, employee2,employee3,employee4,employee5]
df1 = spark.createDataFrame(seq1)

In [None]:
df1.show()

+---------+--------+--------------------+------+
|firstName|lastName|               email|salary|
+---------+--------+--------------------+------+
|  michael|armbrust|no-reply@berkeley...|100000|
| xiangrui|    meng|no-reply@stanford...|120000|
|    matei|    null|no-reply@waterloo...|140000|
|     null| wendell|no-reply@berkeley...|160000|
|  michael| jackson| no-reply@neverla.nd| 80000|
+---------+--------+--------------------+------+



In [None]:
df1.drop('LastName').show()

+---------+--------------------+------+
|firstName|               email|salary|
+---------+--------------------+------+
|  michael|no-reply@berkeley...|100000|
| xiangrui|no-reply@stanford...|120000|
|    matei|no-reply@waterloo...|140000|
|     null|no-reply@berkeley...|160000|
|  michael| no-reply@neverla.nd| 80000|
+---------+--------------------+------+



In [None]:
# Remove the file if it exists
dbutils.fs.rm("/tmp/databricks-df-example.parquet", True)
df1.write.format("parquet").save("/tmp/databricks-df-example.parquet")

In [None]:
parquetDF = spark.read.format("parquet").load("/tmp/databricks-df-example.parquet")
parquetDF.show(truncate=False)

+---------+--------+---------------------+------+
|firstName|lastName|email                |salary|
+---------+--------+---------------------+------+
|michael  |armbrust|no-reply@berkeley.edu|100000|
|xiangrui |meng    |no-reply@stanford.edu|120000|
|michael  |jackson |no-reply@neverla.nd  |80000 |
|null     |wendell |no-reply@berkeley.edu|160000|
|matei    |null    |no-reply@waterloo.edu|140000|
+---------+--------+---------------------+------+



In [None]:
departmentsWithEmployeesSeq1 = [departmentWithEmployees1, departmentWithEmployees2]
df1 = spark.createDataFrame(departmentsWithEmployeesSeq1)

df1.show(truncate=False)

departmentsWithEmployeesSeq2 = [departmentWithEmployees3, departmentWithEmployees4]
df2 = spark.createDataFrame(departmentsWithEmployeesSeq2)

df2.show(truncate=False)

+--------------------------------+-----------------------------------------------------------------------------------------------------+
|department                      |employees                                                                                            |
+--------------------------------+-----------------------------------------------------------------------------------------------------+
|{123456, Computer Science}      |[{michael, armbrust, no-reply@berkeley.edu, 100000}, {xiangrui, meng, no-reply@stanford.edu, 120000}]|
|{789012, Mechanical Engineering}|[{matei, null, no-reply@waterloo.edu, 140000}, {null, wendell, no-reply@berkeley.edu, 160000}]       |
+--------------------------------+-----------------------------------------------------------------------------------------------------+

+---------------------------+------------------------------------------------------------------------------------------------+
|department                 |employees            

In [None]:
unionDF = df1.union(df2)
unionDF.show(truncate=False)

+--------------------------------+-----------------------------------------------------------------------------------------------------+
|department                      |employees                                                                                            |
+--------------------------------+-----------------------------------------------------------------------------------------------------+
|{123456, Computer Science}      |[{michael, armbrust, no-reply@berkeley.edu, 100000}, {xiangrui, meng, no-reply@stanford.edu, 120000}]|
|{789012, Mechanical Engineering}|[{matei, null, no-reply@waterloo.edu, 140000}, {null, wendell, no-reply@berkeley.edu, 160000}]       |
|{345678, Theater and Drama}     |[{michael, jackson, no-reply@neverla.nd, 80000}, {null, wendell, no-reply@berkeley.edu, 160000}]     |
|{901234, Indoor Recreation}     |[{xiangrui, meng, no-reply@stanford.edu, 120000}, {matei, null, no-reply@waterloo.edu, 140000}]      |
+--------------------------------+-------

In [None]:
from pyspark.sql.functions import explode

explodeDF = unionDF.select(explode("employees").alias("e"))
flattenDF = explodeDF.selectExpr("e.firstName", "e.lastName", "e.email", "e.salary")

flattenDF.show(truncate=False)

+---------+--------+---------------------+------+
|firstName|lastName|email                |salary|
+---------+--------+---------------------+------+
|michael  |armbrust|no-reply@berkeley.edu|100000|
|xiangrui |meng    |no-reply@stanford.edu|120000|
|matei    |null    |no-reply@waterloo.edu|140000|
|null     |wendell |no-reply@berkeley.edu|160000|
|michael  |jackson |no-reply@neverla.nd  |80000 |
|null     |wendell |no-reply@berkeley.edu|160000|
|xiangrui |meng    |no-reply@stanford.edu|120000|
|matei    |null    |no-reply@waterloo.edu|140000|
+---------+--------+---------------------+------+



In [None]:
explodeDF2 = unionDF.select(explode("department").alias("d"))
flattenDF2 = explodeDF2.selectExpr("d.id","d.name")

flattenDF2.show(truncate=False)

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-2353303162657079>[0m in [0;36m<module>[0;34m[0m
[0;32m----> 1[0;31m [0mexplodeDF2[0m [0;34m=[0m [0munionDF[0m[0;34m.[0m[0mselect[0m[0;34m([0m[0mexplode[0m[0;34m([0m[0;34m"department"[0m[0;34m)[0m[0;34m.[0m[0malias[0m[0;34m([0m[0;34m"d"[0m[0;34m)[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      2[0m [0mflattenDF2[0m [0;34m=[0m [0mexplodeDF2[0m[0;34m.[0m[0mselectExpr[0m[0;34m([0m[0;34m"d.id"[0m[0;34m,[0m[0;34m"d.name"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m      3[0m [0;34m[0m[0m
[1;32m      4[0m [0mflattenDF2[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0mtruncate[0m[0;34m=[0m[0;32mFalse[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m

[0;32m/databricks/spark/python/pyspark/sql/dataframe.py[0m in [0;36mselect[0;34m(self, *c

In [None]:
filterDF = flattenDF.filter(flattenDF.firstName == "xiangrui").sort(flattenDF.lastName)
filterDF.show(truncate=False)

+---------+--------+---------------------+------+
|firstName|lastName|email                |salary|
+---------+--------+---------------------+------+
|xiangrui |meng    |no-reply@stanford.edu|120000|
|xiangrui |meng    |no-reply@stanford.edu|120000|
+---------+--------+---------------------+------+



In [None]:
@pandas_udf("integer")
def plus_one(seies: pd.Series)->pd.series:
  return series+1
spark.udf.register('plus_one', plus_one)
spark.sql("select plus_one v1 from tableA").show()