**Import all available functions from PySpark SQL for DataFrame operations.**

In [1]:
from pyspark.sql.functions import *

# Import the 'col' function from PySpark SQL for referencing DataFrame columns.
from pyspark.sql.functions import col

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 3, Finished, Available)

**Read CSV using "Load Data" --> "Spark" feature**

In [2]:
# Load a CSV file into a Spark DataFrame with headers.
df = spark.read.format("csv").option("header","true").load("Files/completedacct.csv")
display(df.show(10))
#display(df.limit(10))


StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 4, Finished, Available)

+----------+-----------+--------------------+----------+----+-----+---+
|account_id|district_id|           frequency|parseddate|year|month|day|
+----------+-----------+--------------------+----------+----+-----+---+
| A00000576|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|
| A00003818|         74|    Monthly Issuance|2013-01-01|2013|    1|  1|
| A00000704|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|
| A00002378|         16|    Monthly Issuance|2013-01-01|2013|    1|  1|
| A00002632|         24|    Monthly Issuance|2013-01-02|2013|    1|  2|
| A00001972|         77|    Monthly Issuance|2013-01-02|2013|    1|  2|
| A00001539|          1|Issuance After Tr...|2013-01-03|2013|    1|  3|
| A00000793|         47|    Monthly Issuance|2013-01-03|2013|    1|  3|
| A00002484|         74|    Monthly Issuance|2013-01-03|2013|    1|  3|
| A00001695|         76|    Monthly Issuance|2013-01-03|2013|    1|  3|
+----------+-----------+--------------------+----------+----+---

**Read CSV using "spark.read.csv**

In [3]:
# spark.read.csv("Files/completedacct.csv").show()

# spark.read.option("header","True").csv("Files/completedacct.csv").show()

df = spark.read.option("header","true").csv("Files/completedacct.csv")
display(df.show(10))


StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 5, Finished, Available)

+----------+-----------+--------------------+----------+----+-----+---+
|account_id|district_id|           frequency|parseddate|year|month|day|
+----------+-----------+--------------------+----------+----+-----+---+
| A00000576|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|
| A00003818|         74|    Monthly Issuance|2013-01-01|2013|    1|  1|
| A00000704|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|
| A00002378|         16|    Monthly Issuance|2013-01-01|2013|    1|  1|
| A00002632|         24|    Monthly Issuance|2013-01-02|2013|    1|  2|
| A00001972|         77|    Monthly Issuance|2013-01-02|2013|    1|  2|
| A00001539|          1|Issuance After Tr...|2013-01-03|2013|    1|  3|
| A00000793|         47|    Monthly Issuance|2013-01-03|2013|    1|  3|
| A00002484|         74|    Monthly Issuance|2013-01-03|2013|    1|  3|
| A00001695|         76|    Monthly Issuance|2013-01-03|2013|    1|  3|
+----------+-----------+--------------------+----------+----+---

**Select columns with col function**

In [4]:
from pyspark.sql.functions import col

# df.select(col("account_id"),col("district_id")).show()
df1 = df.select(col("account_id"),col("district_id"))
display(df1.show(10))

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 6, Finished, Available)

+----------+-----------+
|account_id|district_id|
+----------+-----------+
| A00000576|         55|
| A00003818|         74|
| A00000704|         55|
| A00002378|         16|
| A00002632|         24|
| A00001972|         77|
| A00001539|          1|
| A00000793|         47|
| A00002484|         74|
| A00001695|         76|
+----------+-----------+
only showing top 10 rows



**Select columns with "selectExpr" function**

In [5]:
df.selectExpr("account_id","district_id").show()

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 7, Finished, Available)

+----------+-----------+
|account_id|district_id|
+----------+-----------+
| A00000576|         55|
| A00003818|         74|
| A00000704|         55|
| A00002378|         16|
| A00002632|         24|
| A00001972|         77|
| A00001539|          1|
| A00000793|         47|
| A00002484|         74|
| A00001695|         76|
| A00001726|         48|
| A00002881|         70|
| A00002357|         19|
| A00002177|         62|
| A00000485|          6|
| A00000652|         21|
| A00009635|         70|
| A00001844|         44|
| A00001926|         37|
| A00002393|         10|
+----------+-----------+
only showing top 20 rows



**Using "withColumn" and "withColumns" to create a new column**

In [6]:
# to use these functions, this time we will import all the functions
from pyspark.sql.functions import *

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 8, Finished, Available)

In [7]:
# Add a new column 'dayofmonth' extracting the day of the month from 'parseddate', then display the DataFrame.
df.withColumn("dayofmonth", dayofmonth(col("parseddate"))).show()

# Add 'dayofmonth' and 'dayofweek' columns by extracting day of month and day of week from 'parseddate', then display the DataFrame.
df.withColumn("dayofmonth", dayofmonth(col("parseddate"))).withColumn("dayofweek",dayofweek(col("parseddate"))).show()


StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 9, Finished, Available)

+----------+-----------+--------------------+----------+----+-----+---+----------+
|account_id|district_id|           frequency|parseddate|year|month|day|dayofmonth|
+----------+-----------+--------------------+----------+----+-----+---+----------+
| A00000576|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|         1|
| A00003818|         74|    Monthly Issuance|2013-01-01|2013|    1|  1|         1|
| A00000704|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|         1|
| A00002378|         16|    Monthly Issuance|2013-01-01|2013|    1|  1|         1|
| A00002632|         24|    Monthly Issuance|2013-01-02|2013|    1|  2|         2|
| A00001972|         77|    Monthly Issuance|2013-01-02|2013|    1|  2|         2|
| A00001539|          1|Issuance After Tr...|2013-01-03|2013|    1|  3|         3|
| A00000793|         47|    Monthly Issuance|2013-01-03|2013|    1|  3|         3|
| A00002484|         74|    Monthly Issuance|2013-01-03|2013|    1|  3|         3|
| A0

In [8]:
# Add 'dayofmonth', 'dayofweek', and 'dayofyear' columns by extracting respective date parts from 'parseddate', then display the DataFrame.
# You can use "withColumns" to have many columns

df.withColumns({"dayofmonth": dayofmonth(col("parseddate")),\
                "dayofweek": dayofweek(col("parseddate")),\
                "dayofyear": dayofyear(col("parseddate"))}).show()

# withcolumns is the easiest way to create multiple columns

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 10, Finished, Available)

+----------+-----------+--------------------+----------+----+-----+---+----------+---------+---------+
|account_id|district_id|           frequency|parseddate|year|month|day|dayofmonth|dayofweek|dayofyear|
+----------+-----------+--------------------+----------+----+-----+---+----------+---------+---------+
| A00000576|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|         1|        3|        1|
| A00003818|         74|    Monthly Issuance|2013-01-01|2013|    1|  1|         1|        3|        1|
| A00000704|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|         1|        3|        1|
| A00002378|         16|    Monthly Issuance|2013-01-01|2013|    1|  1|         1|        3|        1|
| A00002632|         24|    Monthly Issuance|2013-01-02|2013|    1|  2|         2|        4|        2|
| A00001972|         77|    Monthly Issuance|2013-01-02|2013|    1|  2|         2|        4|        2|
| A00001539|          1|Issuance After Tr...|2013-01-03|2013|    1|  3|  

**Renaming columns**

In [9]:
# Rename the 'account_id' column to 'acc_id' in the DataFrame and display the updated DataFrame.
df.withColumnRenamed("account_id", "acc_id").show()

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 11, Finished, Available)

+---------+-----------+--------------------+----------+----+-----+---+
|   acc_id|district_id|           frequency|parseddate|year|month|day|
+---------+-----------+--------------------+----------+----+-----+---+
|A00000576|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|
|A00003818|         74|    Monthly Issuance|2013-01-01|2013|    1|  1|
|A00000704|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|
|A00002378|         16|    Monthly Issuance|2013-01-01|2013|    1|  1|
|A00002632|         24|    Monthly Issuance|2013-01-02|2013|    1|  2|
|A00001972|         77|    Monthly Issuance|2013-01-02|2013|    1|  2|
|A00001539|          1|Issuance After Tr...|2013-01-03|2013|    1|  3|
|A00000793|         47|    Monthly Issuance|2013-01-03|2013|    1|  3|
|A00002484|         74|    Monthly Issuance|2013-01-03|2013|    1|  3|
|A00001695|         76|    Monthly Issuance|2013-01-03|2013|    1|  3|
|A00001726|         48|    Monthly Issuance|2013-01-03|2013|    1|  3|
|A0000

**Filtering using Where**

In [10]:
# Filter the DataFrame for rows where the 'frequency' column equals 'Monthly Issuance' and display the results.
df.where(col("frequency")=="Monthly Issuance").show()

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 12, Finished, Available)

+----------+-----------+----------------+----------+----+-----+---+
|account_id|district_id|       frequency|parseddate|year|month|day|
+----------+-----------+----------------+----------+----+-----+---+
| A00000576|         55|Monthly Issuance|2013-01-01|2013|    1|  1|
| A00003818|         74|Monthly Issuance|2013-01-01|2013|    1|  1|
| A00000704|         55|Monthly Issuance|2013-01-01|2013|    1|  1|
| A00002378|         16|Monthly Issuance|2013-01-01|2013|    1|  1|
| A00002632|         24|Monthly Issuance|2013-01-02|2013|    1|  2|
| A00001972|         77|Monthly Issuance|2013-01-02|2013|    1|  2|
| A00000793|         47|Monthly Issuance|2013-01-03|2013|    1|  3|
| A00002484|         74|Monthly Issuance|2013-01-03|2013|    1|  3|
| A00001695|         76|Monthly Issuance|2013-01-03|2013|    1|  3|
| A00001726|         48|Monthly Issuance|2013-01-03|2013|    1|  3|
| A00002881|         70|Monthly Issuance|2013-01-04|2013|    1|  4|
| A00002357|         19|Monthly Issuance|2013-01

In [11]:
# Select 'account_id' and 'frequency' columns, then filter rows where 'frequency' equals 'Monthly Issuance', and display the results.
df.select(col("account_id"), col("frequency")).where(col("frequency")=="Monthly Issuance").show()

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 13, Finished, Available)

+----------+----------------+
|account_id|       frequency|
+----------+----------------+
| A00000576|Monthly Issuance|
| A00003818|Monthly Issuance|
| A00000704|Monthly Issuance|
| A00002378|Monthly Issuance|
| A00002632|Monthly Issuance|
| A00001972|Monthly Issuance|
| A00000793|Monthly Issuance|
| A00002484|Monthly Issuance|
| A00001695|Monthly Issuance|
| A00001726|Monthly Issuance|
| A00002881|Monthly Issuance|
| A00002357|Monthly Issuance|
| A00002177|Monthly Issuance|
| A00000652|Monthly Issuance|
| A00009635|Monthly Issuance|
| A00001844|Monthly Issuance|
| A00001926|Monthly Issuance|
| A00002393|Monthly Issuance|
| A00003510|Monthly Issuance|
| A00001276|Monthly Issuance|
+----------+----------------+
only showing top 20 rows



**Filter using Filter function**

filter and where functions are the same

In [12]:
# Select 'account_id' and 'frequency' columns, then filter rows where 'frequency' equals 'Monthly Issuance', and display the results.
df.select(col("account_id"), col("frequency")).filter(col("frequency")=="Monthly Issuance").show()

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 14, Finished, Available)

+----------+----------------+
|account_id|       frequency|
+----------+----------------+
| A00000576|Monthly Issuance|
| A00003818|Monthly Issuance|
| A00000704|Monthly Issuance|
| A00002378|Monthly Issuance|
| A00002632|Monthly Issuance|
| A00001972|Monthly Issuance|
| A00000793|Monthly Issuance|
| A00002484|Monthly Issuance|
| A00001695|Monthly Issuance|
| A00001726|Monthly Issuance|
| A00002881|Monthly Issuance|
| A00002357|Monthly Issuance|
| A00002177|Monthly Issuance|
| A00000652|Monthly Issuance|
| A00009635|Monthly Issuance|
| A00001844|Monthly Issuance|
| A00001926|Monthly Issuance|
| A00002393|Monthly Issuance|
| A00003510|Monthly Issuance|
| A00001276|Monthly Issuance|
+----------+----------------+
only showing top 20 rows



**Group By function**

In [13]:
# Group the DataFrame by 'frequency', count occurrences of 'account_id', rename the count column, and display the results.
df.groupBy(col("frequency")).agg(count(col("account_id")).alias("count of accounts")).show()

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 15, Finished, Available)

+--------------------+-----------------+
|           frequency|count of accounts|
+--------------------+-----------------+
|Issuance After Tr...|               93|
|    Monthly Issuance|             4167|
|     Weekly Issuance|              240|
+--------------------+-----------------+



In [14]:
# Group the DataFrame by 'frequency', find the maximum 'parseddate' for each group, rename the result to 'maxdate', and display the results.
df.groupBy(col("frequency")).agg(max(col("parseddate")).alias("maxdate")).show()

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 16, Finished, Available)

+--------------------+----------+
|           frequency|   maxdate|
+--------------------+----------+
|Issuance After Tr...|2017-12-05|
|    Monthly Issuance|2017-12-29|
|     Weekly Issuance|2017-12-26|
+--------------------+----------+



**JOIN**

In [15]:
# Load a CSV file into a DataFrame named 'acct' with headers, representing account information.
acct=spark.read.format("csv").option("header","true").load("Files/completedacct.csv")

# Load another CSV file into a DataFrame named 'disposition' with headers, representing disposition information.
disposition=spark.read.format("csv").option("header","true").load("Files/completeddisposition.csv")

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 17, Finished, Available)

In [16]:
display(acct.show(10))

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 18, Finished, Available)

+----------+-----------+--------------------+----------+----+-----+---+
|account_id|district_id|           frequency|parseddate|year|month|day|
+----------+-----------+--------------------+----------+----+-----+---+
| A00000576|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|
| A00003818|         74|    Monthly Issuance|2013-01-01|2013|    1|  1|
| A00000704|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|
| A00002378|         16|    Monthly Issuance|2013-01-01|2013|    1|  1|
| A00002632|         24|    Monthly Issuance|2013-01-02|2013|    1|  2|
| A00001972|         77|    Monthly Issuance|2013-01-02|2013|    1|  2|
| A00001539|          1|Issuance After Tr...|2013-01-03|2013|    1|  3|
| A00000793|         47|    Monthly Issuance|2013-01-03|2013|    1|  3|
| A00002484|         74|    Monthly Issuance|2013-01-03|2013|    1|  3|
| A00001695|         76|    Monthly Issuance|2013-01-03|2013|    1|  3|
+----------+-----------+--------------------+----------+----+---

In [17]:
display(disposition.show(10))

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 19, Finished, Available)

+---------+---------+----------+-----+
|  disp_id|client_id|account_id| type|
+---------+---------+----------+-----+
|D00000001|C00000001| A00000001|Owner|
|D00000002|C00000002| A00000002|Owner|
|D00000003|C00000003| A00000002| User|
|D00000004|C00000004| A00000003|Owner|
|D00000005|C00000005| A00000003| User|
|D00000006|C00000006| A00000004|Owner|
|D00000007|C00000007| A00000005|Owner|
|D00000008|C00000008| A00000006|Owner|
|D00000009|C00000009| A00000007|Owner|
|D00000010|C00000010| A00000008|Owner|
+---------+---------+----------+-----+
only showing top 10 rows



In [18]:
# Perform an inner join between 'acct' and 'disposition' DataFrames on the 'account_id' column and display the results.
acct.join(disposition,on=acct.account_id==disposition.account_id,how="inner").show()

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 20, Finished, Available)

+----------+-----------+--------------------+----------+----+-----+---+---------+---------+----------+-----+
|account_id|district_id|           frequency|parseddate|year|month|day|  disp_id|client_id|account_id| type|
+----------+-----------+--------------------+----------+----+-----+---+---------+---------+----------+-----+
| A00000576|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|D00000693|C00000693| A00000576| User|
| A00000576|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|D00000692|C00000692| A00000576|Owner|
| A00003818|         74|    Monthly Issuance|2013-01-01|2013|    1|  1|D00004602|C00004602| A00003818| User|
| A00003818|         74|    Monthly Issuance|2013-01-01|2013|    1|  1|D00004601|C00004601| A00003818|Owner|
| A00000704|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|D00000845|C00000845| A00000704| User|
| A00000704|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|D00000844|C00000844| A00000704|Owner|
| A00002378|       

In [19]:
# Join 'acct' and 'disposition' DataFrames on 'account_id' with an inner join, then drop duplicate 'account_id' from 'disposition', and store the result in 'df_joined'.
df_joined=acct.join(disposition,on=acct.account_id==disposition.account_id,how="inner").drop(disposition.account_id)

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 21, Finished, Available)

In [20]:
display(df_joined.show(10))

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 22, Finished, Available)

+----------+-----------+--------------------+----------+----+-----+---+---------+---------+-----+
|account_id|district_id|           frequency|parseddate|year|month|day|  disp_id|client_id| type|
+----------+-----------+--------------------+----------+----+-----+---+---------+---------+-----+
| A00000576|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|D00000693|C00000693| User|
| A00000576|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|D00000692|C00000692|Owner|
| A00003818|         74|    Monthly Issuance|2013-01-01|2013|    1|  1|D00004602|C00004602| User|
| A00003818|         74|    Monthly Issuance|2013-01-01|2013|    1|  1|D00004601|C00004601|Owner|
| A00000704|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|D00000845|C00000845| User|
| A00000704|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|D00000844|C00000844|Owner|
| A00002378|         16|    Monthly Issuance|2013-01-01|2013|    1|  1|D00002873|C00002873|Owner|
| A00002632|        

**WHERE / FILTER**

In [21]:
from pyspark.sql.functions import col

# Filter 'df_joined' for rows where the 'year' column equals 2013, limit the results to 10 rows, and display them.
df_joined.where(col("year")==2013).limit(10).show()

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 23, Finished, Available)

+----------+-----------+--------------------+----------+----+-----+---+---------+---------+-----+
|account_id|district_id|           frequency|parseddate|year|month|day|  disp_id|client_id| type|
+----------+-----------+--------------------+----------+----+-----+---+---------+---------+-----+
| A00000576|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|D00000693|C00000693| User|
| A00000576|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|D00000692|C00000692|Owner|
| A00003818|         74|    Monthly Issuance|2013-01-01|2013|    1|  1|D00004602|C00004602| User|
| A00003818|         74|    Monthly Issuance|2013-01-01|2013|    1|  1|D00004601|C00004601|Owner|
| A00000704|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|D00000845|C00000845| User|
| A00000704|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|D00000844|C00000844|Owner|
| A00002378|         16|    Monthly Issuance|2013-01-01|2013|    1|  1|D00002873|C00002873|Owner|
| A00002632|        

In [22]:
# Filter 'df_joined' DataFrame for records from the year 2013, limit to the first 10, and display them.
df_joined.filter(col("year")==2013).limit(10).show()

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 24, Finished, Available)

+----------+-----------+--------------------+----------+----+-----+---+---------+---------+-----+
|account_id|district_id|           frequency|parseddate|year|month|day|  disp_id|client_id| type|
+----------+-----------+--------------------+----------+----+-----+---+---------+---------+-----+
| A00000576|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|D00000693|C00000693| User|
| A00000576|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|D00000692|C00000692|Owner|
| A00003818|         74|    Monthly Issuance|2013-01-01|2013|    1|  1|D00004602|C00004602| User|
| A00003818|         74|    Monthly Issuance|2013-01-01|2013|    1|  1|D00004601|C00004601|Owner|
| A00000704|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|D00000845|C00000845| User|
| A00000704|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|D00000844|C00000844|Owner|
| A00002378|         16|    Monthly Issuance|2013-01-01|2013|    1|  1|D00002873|C00002873|Owner|
| A00002632|        

**Union**

In [23]:
# Create a new DataFrame 'df1' by filtering 'df_joined' for records from the year 2013 and limiting to the first 10 entries.
df1=df_joined.where(col("year")==2013).limit(10)

# Create another DataFrame 'df2' by filtering 'df_joined' for records from the year 2014 and limiting to the first 10 entries.
df2=df_joined.where(col("year")==2014).limit(10)

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 25, Finished, Available)

In [24]:
# Combine the rows from DataFrames 'df1' and 'df2' and display the resulting DataFrame.
df1.union(df2).show()

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 26, Finished, Available)

+----------+-----------+--------------------+----------+----+-----+---+---------+---------+-----+
|account_id|district_id|           frequency|parseddate|year|month|day|  disp_id|client_id| type|
+----------+-----------+--------------------+----------+----+-----+---+---------+---------+-----+
| A00000576|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|D00000693|C00000693| User|
| A00000576|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|D00000692|C00000692|Owner|
| A00003818|         74|    Monthly Issuance|2013-01-01|2013|    1|  1|D00004602|C00004602| User|
| A00003818|         74|    Monthly Issuance|2013-01-01|2013|    1|  1|D00004601|C00004601|Owner|
| A00000704|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|D00000845|C00000845| User|
| A00000704|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|D00000844|C00000844|Owner|
| A00002378|         16|    Monthly Issuance|2013-01-01|2013|    1|  1|D00002873|C00002873|Owner|
| A00002632|        

In [25]:
# Create 'df3' by selecting 'account_id' and 'district_id' from 'df1', limiting to the first 3 records.
df3=df1.selectExpr("account_id","district_id").limit(3)

# Create 'df4' by selecting 'district_id' and 'account_id' from 'df2', ensuring the column order is reversed, and limit to the first 3 records.
df4=df2.selectExpr("district_id","account_id").limit(3)

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 27, Finished, Available)

In [26]:
df3.show()

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 28, Finished, Available)

+----------+-----------+
|account_id|district_id|
+----------+-----------+
| A00000576|         55|
| A00000576|         55|
| A00003818|         74|
+----------+-----------+



In [27]:
df4.show()

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 29, Finished, Available)

+-----------+----------+
|district_id|account_id|
+-----------+----------+
|         14| A00002789|
|         18| A00001552|
|         72| A00000628|
+-----------+----------+



In [28]:
# Combine rows from DataFrames 'df3' and 'df4' ensuring column names align correctly, and display the combined DataFrame.
df3.union(df4).show()

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 30, Finished, Available)

+----------+-----------+
|account_id|district_id|
+----------+-----------+
| A00000576|         55|
| A00000576|         55|
| A00003818|         74|
|        14|  A00002789|
|        18|  A00001552|
|        72|  A00000628|
+----------+-----------+



In [29]:
# Combine rows from 'df3' and 'df4' based on column names, not positions, and display the resulting DataFrame.
df3.unionByName(df4).show()

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 31, Finished, Available)

+----------+-----------+
|account_id|district_id|
+----------+-----------+
| A00000576|         55|
| A00000576|         55|
| A00003818|         74|
| A00002789|         14|
| A00001552|         18|
| A00000628|         72|
+----------+-----------+



**Drop Duplicates & Distinct Functions**

In [30]:
display(df_joined.show(10))

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 32, Finished, Available)

+----------+-----------+--------------------+----------+----+-----+---+---------+---------+-----+
|account_id|district_id|           frequency|parseddate|year|month|day|  disp_id|client_id| type|
+----------+-----------+--------------------+----------+----+-----+---+---------+---------+-----+
| A00000576|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|D00000693|C00000693| User|
| A00000576|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|D00000692|C00000692|Owner|
| A00003818|         74|    Monthly Issuance|2013-01-01|2013|    1|  1|D00004602|C00004602| User|
| A00003818|         74|    Monthly Issuance|2013-01-01|2013|    1|  1|D00004601|C00004601|Owner|
| A00000704|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|D00000845|C00000845| User|
| A00000704|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|D00000844|C00000844|Owner|
| A00002378|         16|    Monthly Issuance|2013-01-01|2013|    1|  1|D00002873|C00002873|Owner|
| A00002632|        

In [31]:
# Drop the 'district_id' column from 'df_joined' DataFrame and display the updated DataFrame.
df_joined.drop("district_id").show()

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 33, Finished, Available)

+----------+--------------------+----------+----+-----+---+---------+---------+-----+
|account_id|           frequency|parseddate|year|month|day|  disp_id|client_id| type|
+----------+--------------------+----------+----+-----+---+---------+---------+-----+
| A00000576|    Monthly Issuance|2013-01-01|2013|    1|  1|D00000693|C00000693| User|
| A00000576|    Monthly Issuance|2013-01-01|2013|    1|  1|D00000692|C00000692|Owner|
| A00003818|    Monthly Issuance|2013-01-01|2013|    1|  1|D00004602|C00004602| User|
| A00003818|    Monthly Issuance|2013-01-01|2013|    1|  1|D00004601|C00004601|Owner|
| A00000704|    Monthly Issuance|2013-01-01|2013|    1|  1|D00000845|C00000845| User|
| A00000704|    Monthly Issuance|2013-01-01|2013|    1|  1|D00000844|C00000844|Owner|
| A00002378|    Monthly Issuance|2013-01-01|2013|    1|  1|D00002873|C00002873|Owner|
| A00002632|    Monthly Issuance|2013-01-02|2013|    1|  2|D00003177|C00003177|Owner|
| A00001972|    Monthly Issuance|2013-01-02|2013|    1

In [32]:
# Remove 'district_id' and 'parseddate' columns from 'df_joined' DataFrame and display the result.
df_joined.drop("district_id","parseddate").show()

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 34, Finished, Available)

+----------+----------------+----+-----+---+---------+---------+-----+
|account_id|       frequency|year|month|day|  disp_id|client_id| type|
+----------+----------------+----+-----+---+---------+---------+-----+
| A00000001|Monthly Issuance|2015|    3| 24|D00000001|C00000001|Owner|
| A00000002|Monthly Issuance|2013|    2| 26|D00000002|C00000002|Owner|
| A00000002|Monthly Issuance|2013|    2| 26|D00000003|C00000003| User|
| A00000003|Monthly Issuance|2017|    7|  7|D00000004|C00000004|Owner|
| A00000003|Monthly Issuance|2017|    7|  7|D00000005|C00000005| User|
| A00000004|Monthly Issuance|2016|    2| 21|D00000006|C00000006|Owner|
| A00000005|Monthly Issuance|2017|    5| 30|D00000007|C00000007|Owner|
| A00000006|Monthly Issuance|2014|    9| 27|D00000008|C00000008|Owner|
| A00000007|Monthly Issuance|2016|   11| 24|D00000009|C00000009|Owner|
| A00000008|Monthly Issuance|2015|    9| 21|D00000010|C00000010|Owner|
| A00000008|Monthly Issuance|2015|    9| 21|D00000011|C00000011| User|
| A000

In [33]:
# Select unique 'frequency' and 'year' pairs from 'df_joined' and store them in 'df1', then display the DataFrame.
df1=df_joined.select("frequency","year").distinct()
display(df1.show(10))

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 35, Finished, Available)

+--------------------+----+
|           frequency|year|
+--------------------+----+
|    Monthly Issuance|2017|
|    Monthly Issuance|2014|
|     Weekly Issuance|2017|
|    Monthly Issuance|2016|
|    Monthly Issuance|2015|
|     Weekly Issuance|2013|
|     Weekly Issuance|2015|
|Issuance After Tr...|2014|
|Issuance After Tr...|2015|
|Issuance After Tr...|2017|
+--------------------+----+
only showing top 10 rows



In [34]:
# Create a new DataFrame 'df2' by removing duplicate rows from 'df_joined' and display it.
df2=df_joined.distinct()
display(df2.show(10))


StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 36, Finished, Available)

+----------+-----------+----------------+----------+----+-----+---+---------+---------+-----+
|account_id|district_id|       frequency|parseddate|year|month|day|  disp_id|client_id| type|
+----------+-----------+----------------+----------+----+-----+---+---------+---------+-----+
| A00003069|         66|Monthly Issuance|2013-03-20|2013|    3| 20|D00003711|C00003711|Owner|
| A00000790|         54|Monthly Issuance|2013-05-11|2013|    5| 11|D00000946|C00000946|Owner|
| A00002213|         68|Monthly Issuance|2013-07-11|2013|    7| 11|D00002679|C00002679|Owner|
| A00000854|         62|Monthly Issuance|2013-09-21|2013|    9| 21|D00001029|C00001029|Owner|
| A00002269|          6|Monthly Issuance|2013-11-04|2013|   11|  4|D00002748|C00002748| User|
| A00000352|         39|Monthly Issuance|2014-07-04|2014|    7|  4|D00000430|C00000430|Owner|
| A00007546|         43|Monthly Issuance|2014-09-03|2014|    9|  3|D00009038|C00009346|Owner|
| A00005422|         66|Monthly Issuance|2014-10-22|2014|   

In [35]:
df=df2.limit(10)
df.show()

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 37, Finished, Available)

+----------+-----------+----------------+----------+----+-----+---+---------+---------+-----+
|account_id|district_id|       frequency|parseddate|year|month|day|  disp_id|client_id| type|
+----------+-----------+----------------+----------+----+-----+---+---------+---------+-----+
| A00003069|         66|Monthly Issuance|2013-03-20|2013|    3| 20|D00003711|C00003711|Owner|
| A00000790|         54|Monthly Issuance|2013-05-11|2013|    5| 11|D00000946|C00000946|Owner|
| A00002213|         68|Monthly Issuance|2013-07-11|2013|    7| 11|D00002679|C00002679|Owner|
| A00000854|         62|Monthly Issuance|2013-09-21|2013|    9| 21|D00001029|C00001029|Owner|
| A00002269|          6|Monthly Issuance|2013-11-04|2013|   11|  4|D00002748|C00002748| User|
| A00000352|         39|Monthly Issuance|2014-07-04|2014|    7|  4|D00000430|C00000430|Owner|
| A00007546|         43|Monthly Issuance|2014-09-03|2014|    9|  3|D00009038|C00009346|Owner|
| A00005422|         66|Monthly Issuance|2014-10-22|2014|   

In [36]:
# Remove duplicate rows based on the 'frequency' column in 'df' and display the resulting DataFrame.
df.drop_duplicates(["frequency"]).show()

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 38, Finished, Available)

+----------+-----------+----------------+----------+----+-----+---+---------+---------+-----+
|account_id|district_id|       frequency|parseddate|year|month|day|  disp_id|client_id| type|
+----------+-----------+----------------+----------+----+-----+---+---------+---------+-----+
| A00003069|         66|Monthly Issuance|2013-03-20|2013|    3| 20|D00003711|C00003711|Owner|
+----------+-----------+----------------+----------+----+-----+---+---------+---------+-----+



In [37]:
# Remove duplicate rows based on the 'frequency' and 'year' columns from 'df2' and display the updated DataFrame.
df2.drop_duplicates(["frequency","year"]).show()

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 39, Finished, Available)

+----------+-----------+--------------------+----------+----+-----+---+---------+---------+-----+
|account_id|district_id|           frequency|parseddate|year|month|day|  disp_id|client_id| type|
+----------+-----------+--------------------+----------+----+-----+---+---------+---------+-----+
| A00001539|          1|Issuance After Tr...|2013-01-03|2013|    1|  3|D00001866|C00001866|Owner|
| A00002628|         64|Issuance After Tr...|2014-02-06|2014|    2|  6|D00003172|C00003172|Owner|
| A00005650|         10|Issuance After Tr...|2015-01-08|2015|    1|  8|D00006829|C00006829|Owner|
| A00003638|         52|Issuance After Tr...|2016-01-10|2016|    1| 10|D00004387|C00004387|Owner|
| A00003005|         42|Issuance After Tr...|2017-01-11|2017|    1| 11|D00003629|C00003629|Owner|
| A00000576|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|D00000693|C00000693| User|
| A00002789|         14|    Monthly Issuance|2014-01-01|2014|    1|  1|D00003368|C00003368|Owner|
| A00003006|        

In [38]:
# Display distinct values from the 'frequency' column in DataFrame 'df'
df.select("frequency").distinct().show()

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 40, Finished, Available)

+----------------+
|       frequency|
+----------------+
|Monthly Issuance|
+----------------+



**Exporting dataframes to Lakehouse**

In [39]:
# Select 'account_id', 'district_id', and 'frequency' from 'df', and save the result as a Parquet file in 'Files/acct', overwriting existing data.
df.select("account_id","district_id","frequency").write.mode("overwrite").format("parquet").save("Files/acct")

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 41, Finished, Available)

**Also look at different writing methods**
1. overwrite
2. append
3. ignore
4. error

In [40]:
# Save the entire 'df' DataFrame as a Parquet file in 'Files/acct', overwriting any existing data.
df.write.mode("overwrite").format("parquet").save("Files/acct")

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 42, Finished, Available)

In [41]:
# Save selected columns of 'df' as JSON files in 'Files/acct', overwriting existing files.
df.select("account_id","district_id","frequency").write.mode("overwrite").format("json").save("Files/acct")

# Save selected columns of 'df' as CSV files in 'Files/acct', overwriting existing files.
df.select("account_id","district_id","frequency").write.mode("overwrite").format("csv").save("Files/acct")

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 43, Finished, Available)

In [42]:
# Save selected columns from 'df' as Parquet files in 'Files/acct', overwriting existing files.
df.select("account_id","district_id","frequency").write.mode("overwrite").format("parquet").save("Files/acct")

# Append selected columns from 'df' to existing Parquet files in 'Files/acct'.
df.select("account_id","district_id","frequency").write.mode("append").format("parquet").save("Files/acct")

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 44, Finished, Available)

In [43]:
# Save selected columns from 'df' as a Delta table named 'acct', overwriting any existing table with the same name.
df.select("account_id","district_id","frequency").write.mode("overwrite").format("delta").saveAsTable("acct")

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 45, Finished, Available)

**Read delta table data using Spark SQL**

In [44]:
%%sql

SELECT *
FROM acct LIMIT 10

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 46, Finished, Available)

<Spark SQL result set with 10 rows and 3 fields>

**Create a delta table using pyspark**

In [45]:
# Import all classes from the delta.tables module for Delta Lake operations.
from delta.tables import *

# Import all data types from the pyspark.sql.types module for defining DataFrame schemas.
from pyspark.sql.types import *

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 47, Finished, Available)

In [46]:
# Create a Delta table named 'Acct' if it does not already exist, with specified columns and types, using the DeltaTable API.
DeltaTable.createIfNotExists(spark)\
          .tableName("acct")\
          .addColumn("accound_id", StringType())\
          .addColumn("district_id", IntegerType())\
          .addColumn("frequency", StructType())\
          .execute()

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 48, Finished, Available)

<delta.tables.DeltaTable at 0x776b62f99d20>

In [47]:
df = spark.read.format("csv").option("header","true").load("Files/completedacct.csv")
# df now is a Spark DataFrame containing CSV data from "Files/completedacct.csv".
display(df.show(10))

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 49, Finished, Available)

+----------+-----------+--------------------+----------+----+-----+---+
|account_id|district_id|           frequency|parseddate|year|month|day|
+----------+-----------+--------------------+----------+----+-----+---+
| A00000576|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|
| A00003818|         74|    Monthly Issuance|2013-01-01|2013|    1|  1|
| A00000704|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|
| A00002378|         16|    Monthly Issuance|2013-01-01|2013|    1|  1|
| A00002632|         24|    Monthly Issuance|2013-01-02|2013|    1|  2|
| A00001972|         77|    Monthly Issuance|2013-01-02|2013|    1|  2|
| A00001539|          1|Issuance After Tr...|2013-01-03|2013|    1|  3|
| A00000793|         47|    Monthly Issuance|2013-01-03|2013|    1|  3|
| A00002484|         74|    Monthly Issuance|2013-01-03|2013|    1|  3|
| A00001695|         76|    Monthly Issuance|2013-01-03|2013|    1|  3|
+----------+-----------+--------------------+----------+----+---

In [48]:
df.selectExpr("account_id","district_id","frequency").write.mode("append").format("delta").saveAsTable("acct")

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 50, Finished, Available)

**If you are getting an error for mismatching datatypes on "district_id", you can use "cast" function to convert datatypes**

In [49]:
df.selectExpr("account_id","district_id","frequency").write.mode("overwrite").format("delta").saveAsTable("acct")

# If datatype of "district_id" is mismatching then you can use cast() function as below

# Cast 'district_id' column to integer type and save the DataFrame as a Delta table named 'acct', appending to existing data.
# df.selectExpr("account_id","cast(district_id as int) as district_id","frequency").write.mode("append").format("delta").saveAsTable("acct")

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 51, Finished, Available)

In [50]:
%%sql

SELECT *
FROM acct LIMIT 10

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 52, Finished, Available)

<Spark SQL result set with 10 rows and 3 fields>

**Lets create a delta table and load csv data into the delta table**

In [51]:
from delta.tables import *
from pyspark.sql.types import *

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 53, Finished, Available)

In [52]:
DeltaTable.createIfNotExists(spark)\
          .tableName("acct")\
          .addColumn("accound_id", StringType())\
          .addColumn("district_id", IntegerType())\
          .addColumn("frequency", StringType())\
          .execute()


StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 54, Finished, Available)

<delta.tables.DeltaTable at 0x776b36995180>

In [53]:
df_new = spark.read.format("csv").option("header","true").load("Files/completedacct.csv")
# df now is a Spark DataFrame containing CSV data from "Files/completedacct.csv".
display(df_new.show(10))

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 55, Finished, Available)

+----------+-----------+--------------------+----------+----+-----+---+
|account_id|district_id|           frequency|parseddate|year|month|day|
+----------+-----------+--------------------+----------+----+-----+---+
| A00000576|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|
| A00003818|         74|    Monthly Issuance|2013-01-01|2013|    1|  1|
| A00000704|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|
| A00002378|         16|    Monthly Issuance|2013-01-01|2013|    1|  1|
| A00002632|         24|    Monthly Issuance|2013-01-02|2013|    1|  2|
| A00001972|         77|    Monthly Issuance|2013-01-02|2013|    1|  2|
| A00001539|          1|Issuance After Tr...|2013-01-03|2013|    1|  3|
| A00000793|         47|    Monthly Issuance|2013-01-03|2013|    1|  3|
| A00002484|         74|    Monthly Issuance|2013-01-03|2013|    1|  3|
| A00001695|         76|    Monthly Issuance|2013-01-03|2013|    1|  3|
+----------+-----------+--------------------+----------+----+---

In [54]:
df_new.selectExpr("account_id","district_id","frequency").write.mode("append").format("delta").saveAsTable("acct")

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 56, Finished, Available)

In [55]:
%%sql

SELECT *
FROM acct LIMIT 10

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 57, Finished, Available)

<Spark SQL result set with 10 rows and 3 fields>

**create a delta table with partitions**

In [56]:
from delta.tables import *
from pyspark.sql.types import *

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 58, Finished, Available)

In [57]:
DeltaTable.createIfNotExists(spark)\
          .tableName("acct_1")\
          .addColumn("account_id", StringType())\
          .addColumn("district_id", IntegerType())\
          .addColumn("frequency", StringType())\
          .partitionedBy("frequency")\
          .execute()

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 59, Finished, Available)

<delta.tables.DeltaTable at 0x776b62f9ba90>

In [58]:
df = spark.read.format("csv").option("header","true").load("Files/completedacct.csv")
# df now is a Spark DataFrame containing CSV data from "Files/completedacct.csv".
display(df.show(10))

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 60, Finished, Available)

+----------+-----------+--------------------+----------+----+-----+---+
|account_id|district_id|           frequency|parseddate|year|month|day|
+----------+-----------+--------------------+----------+----+-----+---+
| A00000576|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|
| A00003818|         74|    Monthly Issuance|2013-01-01|2013|    1|  1|
| A00000704|         55|    Monthly Issuance|2013-01-01|2013|    1|  1|
| A00002378|         16|    Monthly Issuance|2013-01-01|2013|    1|  1|
| A00002632|         24|    Monthly Issuance|2013-01-02|2013|    1|  2|
| A00001972|         77|    Monthly Issuance|2013-01-02|2013|    1|  2|
| A00001539|          1|Issuance After Tr...|2013-01-03|2013|    1|  3|
| A00000793|         47|    Monthly Issuance|2013-01-03|2013|    1|  3|
| A00002484|         74|    Monthly Issuance|2013-01-03|2013|    1|  3|
| A00001695|         76|    Monthly Issuance|2013-01-03|2013|    1|  3|
+----------+-----------+--------------------+----------+----+---

In [59]:
df.selectExpr("account_id","cast(district_id as int) as district_id","frequency").write.mode("overwrite").format("delta").saveAsTable('acct_1')

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 61, Finished, Available)

- You will see now that there are three partition folders created on acct_1 table folder

- Go to Lakehouse and click "View Files" from acct_1 table


**There is another way you can directly partition data without needing create table script**

In [60]:
df.selectExpr("account_id","cast(district_id as int) as district_id","frequency").write.mode("overwrite").format("delta").partitionBy("frequency").saveAsTable('acct_test')

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 62, Finished, Available)

**Tip on using partition by : always use low cardinatily columns for partitioning**
Ex : Year, Month, Gender

**Managed and External data tables**

Will create a managed table, which means both table and data are in one location.

If we drop acct table, the table and data both will be deleted

In [61]:
df.selectExpr("account_id","district_id").distinct().write.mode("overwrite").format("delta").saveAsTable("acct")

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 63, Finished, Available)

In [62]:
%%sql

DROP TABLE acct

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 64, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

Now will create an external table

Here the delta table will be created in "Tables" folder and data will be stored at "Files" folder

In [63]:
df.selectExpr("account_id","district_id").distinct().write.mode("overwrite").format("delta").saveAsTable("acct", path="Files/acct")

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 65, Finished, Available)

Now if we drop the table like before, but since this is an external table - the table will be dropped but still we have the underlying data.

In [64]:
%%sql

DROP TABLE acct

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 66, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

Reading the underlying external data

In [65]:
df=spark.read.format("delta").load("Files/acct")
display(df.show(10))

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 67, Finished, Available)

+----------+-----------+---------+
|account_id|district_id|frequency|
+----------+-----------+---------+
| A00001518|         67|     null|
| A00001429|         57|     null|
| A00001382|         48|     null|
| A00002623|         58|     null|
| A00001225|          4|     null|
| A00001098|         47|     null|
| A00001248|         22|     null|
| A00000993|          6|     null|
| A00001368|         22|     null|
| A00002391|          2|     null|
+----------+-----------+---------+
only showing top 10 rows



Now since we have dropped the acct table before, now we need to reattach the delta table with the underlying data

In [66]:
from delta.tables import *
from pyspark.sql.types import *


StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 68, Finished, Available)

In [67]:
DeltaTable.createIfNotExists(spark)\
          .tableName("acct")\
          .addColumn("account_id", StringType())\
          .addColumn("district_id", StringType())\
          .addColumn("frequency", StringType())\
          .location("Files/acct")\
          .execute()

#.location() - its the method where you link the data to the table

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 69, Finished, Available)

<delta.tables.DeltaTable at 0x776b36996950>

In [68]:
%%sql

SELECT *
FROM acct LIMIT 10

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 70, Finished, Available)

<Spark SQL result set with 10 rows and 3 fields>

Create a delta table using Spark SQL

In [69]:
df_d = spark.read.format("csv").option("header","true").load("Files/completeddisposition.csv")
# df now is a Spark DataFrame containing CSV data from "Files/completeddisposition.csv".
display(df_d.show(10))

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 71, Finished, Available)

+---------+---------+----------+-----+
|  disp_id|client_id|account_id| type|
+---------+---------+----------+-----+
|D00000001|C00000001| A00000001|Owner|
|D00000002|C00000002| A00000002|Owner|
|D00000003|C00000003| A00000002| User|
|D00000004|C00000004| A00000003|Owner|
|D00000005|C00000005| A00000003| User|
|D00000006|C00000006| A00000004|Owner|
|D00000007|C00000007| A00000005|Owner|
|D00000008|C00000008| A00000006|Owner|
|D00000009|C00000009| A00000007|Owner|
|D00000010|C00000010| A00000008|Owner|
+---------+---------+----------+-----+
only showing top 10 rows



In [70]:
%%sql

CREATE TABLE IF NOT EXISTS disposition
(
    disp_id varchar(100),
    client_id varchar(100),
    account_id varchar(100),
    type varchar(100)
)
USING DELTA

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 72, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

In [71]:
df_d.write.mode("overwrite").format("delta").saveAsTable("disposition")

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 73, Finished, Available)

In [72]:
%%sql
SELECT * FROM disposition LIMIT 10

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 74, Finished, Available)

<Spark SQL result set with 10 rows and 4 fields>

Now with partition

In [73]:
%%sql

CREATE TABLE IF NOT EXISTS account_1
(
    account_id STRING,
    district_id STRING,
    frequency STRING
)
USING DELTA
PARTITIONED BY(frequency)

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 75, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

In [74]:
df.createOrReplaceTempView("view_account")

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 76, Finished, Available)

In [75]:
%%sql
INSERT INTO account_1
SELECT account_id,district_id,frequency FROM view_account

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 77, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

In [76]:
%%sql
SELECT * FROM account_1 LIMIT 10

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 78, Finished, Available)

<Spark SQL result set with 10 rows and 3 fields>

DML operations with Sparkk SQL

In [77]:
df = spark.read.format("csv").option("header","true").load("Files/completeddisposition.csv")
# df now is a Spark DataFrame containing CSV data from "Files/completeddisposition.csv".
display(df.show(10))

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 79, Finished, Available)

+---------+---------+----------+-----+
|  disp_id|client_id|account_id| type|
+---------+---------+----------+-----+
|D00000001|C00000001| A00000001|Owner|
|D00000002|C00000002| A00000002|Owner|
|D00000003|C00000003| A00000002| User|
|D00000004|C00000004| A00000003|Owner|
|D00000005|C00000005| A00000003| User|
|D00000006|C00000006| A00000004|Owner|
|D00000007|C00000007| A00000005|Owner|
|D00000008|C00000008| A00000006|Owner|
|D00000009|C00000009| A00000007|Owner|
|D00000010|C00000010| A00000008|Owner|
+---------+---------+----------+-----+
only showing top 10 rows



In [78]:
df.write.format("delta").saveAsTable("disposition")

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 80, Finished, Available)

In [79]:
%%sql
SELECT * FROM disposition LIMIT 10

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 81, Finished, Available)

<Spark SQL result set with 10 rows and 4 fields>

In [80]:
%%sql

DELETE FROM disposition WHERE disp_id="D00000057"

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 82, Finished, Available)

<Spark SQL result set with 1 rows and 1 fields>

In [81]:
%%sql

UPDATE disposition SET type='owner' WHERE disp_id='D00000011'

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 83, Finished, Available)

<Spark SQL result set with 1 rows and 1 fields>

In [82]:
%%sql

SELECT * FROM disposition WHERE disp_id='D00000011'

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 84, Finished, Available)

<Spark SQL result set with 1 rows and 4 fields>

In [83]:
%%sql

INSERT INTO disposition
(
    disp_id,client_id,account_id,type
)
VALUES
(
    'Test',
    'Test',
    'Test',
    'Test'
)

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 85, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

In [84]:
%%sql

SELECT * FROM disposition WHERE type='Test'

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 86, Finished, Available)

<Spark SQL result set with 1 rows and 4 fields>

Create a view from a dataframe

In [85]:
df.createTempView("view_acct")

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 87, Finished, Available)

In [86]:
%%sql

SELECT * FROM view_acct LIMIT 10

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 88, Finished, Available)

<Spark SQL result set with 10 rows and 4 fields>

In [87]:
#also you can use "createOrReplaceTempView" to avoid already exist errors

df.createOrReplaceTempView("view_acct")


StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 89, Finished, Available)

Note : To use views outside of this notebook or session scope, you will have to use Global views

In [88]:
df.createGlobalTempView("global_view_acct")

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 90, Finished, Available)

Make sure inorder to use any global views, you will have add "global_temp" infront of 

In [89]:
%%sql
SELECT * FROM global_temp.global_view_acct LIMIT 10

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 91, Finished, Available)

<Spark SQL result set with 10 rows and 4 fields>

Data transformation using Spark SQL

In [90]:
%%sql

SELECT * FROM disposition LIMIT 10

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 92, Finished, Available)

<Spark SQL result set with 10 rows and 4 fields>

In [91]:
%%sql
SELECT * FROM disposition WHERE disp_id='D00000011'

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 93, Finished, Available)

<Spark SQL result set with 1 rows and 4 fields>

In [92]:
%%sql
SELECT COUNT(disp_id) AS Count,type 
FROM disposition
GROUP BY type

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 94, Finished, Available)

<Spark SQL result set with 4 rows and 2 fields>

In [93]:
%%sql

SELECT * 
FROM account_1 acc
INNER JOIN disposition dis ON acc.account_id=dis.account_id 
LIMIT 10

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 95, Finished, Available)

<Spark SQL result set with 10 rows and 7 fields>

In [94]:
%%sql

SELECT 
    acc.*,
    dis.disp_id,
    dis.type,
    dis.client_id 
FROM account_1 acc
INNER JOIN disposition dis ON acc.account_id=dis.account_id 
LIMIT 10

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 96, Finished, Available)

<Spark SQL result set with 10 rows and 6 fields>

Read delta table data using Spark

In [95]:
df_acct=spark.read.table("Learn_Lakehouse.acct_1")

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 97, Finished, Available)

In [96]:
display(df.show(10))

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 98, Finished, Available)

+---------+---------+----------+-----+
|  disp_id|client_id|account_id| type|
+---------+---------+----------+-----+
|D00000001|C00000001| A00000001|Owner|
|D00000002|C00000002| A00000002|Owner|
|D00000003|C00000003| A00000002| User|
|D00000004|C00000004| A00000003|Owner|
|D00000005|C00000005| A00000003| User|
|D00000006|C00000006| A00000004|Owner|
|D00000007|C00000007| A00000005|Owner|
|D00000008|C00000008| A00000006|Owner|
|D00000009|C00000009| A00000007|Owner|
|D00000010|C00000010| A00000008|Owner|
+---------+---------+----------+-----+
only showing top 10 rows



Also this is possible

In [97]:
df1=spark.sql("SELECT * FROM acct_1 LIMIT 10")

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 99, Finished, Available)

In [99]:
df1.show()

StatementMeta(, 433868e0-a765-4acd-863d-7941dba4232f, 101, Finished, Available)

+----------+-----------+----------------+
|account_id|district_id|       frequency|
+----------+-----------+----------------+
| A00002378|         16|Monthly Issuance|
| A00002632|         24|Monthly Issuance|
| A00001972|         77|Monthly Issuance|
| A00001726|         48|Monthly Issuance|
| A00002177|         62|Monthly Issuance|
| A00001926|         37|Monthly Issuance|
| A00002393|         10|Monthly Issuance|
| A00003510|         41|Monthly Issuance|
| A00003871|         26|Monthly Issuance|
| A00003487|         41|Monthly Issuance|
+----------+-----------+----------------+

