Load the Data from CSV file into the Data Frame

In [0]:

df = spark.read.csv("/FileStore/tables/Order.csv", header=True, inferSchema=True, sep =',')
df.printSchema()
df.show()


root
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- ItemType: string (nullable = true)
 |-- SalesChannel: string (nullable = true)
 |-- OrderPriority: string (nullable = true)
 |-- OrderID: integer (nullable = true)
 |-- UnitsSold: integer (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- UnitCost: double (nullable = true)
 |-- TotalRevenue: double (nullable = true)
 |-- TotalCost: double (nullable = true)
 |-- TotalProfit: double (nullable = true)

+--------------------+----------+---------------+------------+-------------+---------+---------+---------+--------+------------+----------+-----------+
|              Region|   Country|       ItemType|SalesChannel|OrderPriority|  OrderID|UnitsSold|UnitPrice|UnitCost|TotalRevenue| TotalCost|TotalProfit|
+--------------------+----------+---------------+------------+-------------+---------+---------+---------+--------+------------+----------+-----------+
|Middle East and N...|     Libya|      Co

Display the Data using the Databricks Specific Function

In [0]:
display(df)


#Data Type in the pySpark

In [0]:
#Data type                             Value assigned in Python                   API to instantiate
#ByteType                                 	int                                     DataTypes.ByteType
#ShortType                            		int                                     DataTypes.ShortType
#IntegerType                         		int                   					DataTypes.IntegerType
#LongType                           		int                 					DataTypes.LongType
#FloatType                           		float              						DataTypes.FloatType
#DoubleType                          		float                  					DataTypes.DoubleType
#StringType                          		str             						DataTypes.StringType
#BooleanType                          		bool                					DataTypes.BooleanType
#DecimalType                          		decimal.Decimal         				DecimalType
#BinaryType 								bytearray 								BinaryType()
#TimestampType 								datetime.datetime 						TimestampType()
#DateType 									datetime.date 							DateType()
#ArrayType 									List, tuple, or array 					ArrayType(dataType, [nullable])
#MapType 									dict 									MapType(keyType, valueType, [nullable])
#StructType 								List or tuple 							StructType([fields])
#StructField 								 										StructField(name, dataType, [nullable])


#Define schema progarmatically

In [0]:
#Define schema progarmatically
from pyspark.sql.types import *
orderSchema = StructType([StructField("Region", StringType() ,True)
,StructField("Country", StringType() ,True)
,StructField("ItemType", StringType() ,True)
,StructField("SalesChannel", StringType() ,True)
,StructField("OrderPriority", StringType() ,True)
,StructField("OrderID", IntegerType() ,True)
,StructField("UnitsSold", IntegerType() ,True)
,StructField("UnitPrice", DoubleType() ,True)
,StructField("UnitCost", DoubleType() ,True)
,StructField("TotalRevenue", DoubleType() ,True)
,StructField("TotalCost", DoubleType() ,True)
,StructField("TotalProfit", DoubleType() ,True)
])


df = spark.read.load("/FileStore/tables/Order.csv",format="csv", header=True, schema=orderSchema)
df.printSchema()
df.schema

root
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- ItemType: string (nullable = true)
 |-- SalesChannel: string (nullable = true)
 |-- OrderPriority: string (nullable = true)
 |-- OrderID: integer (nullable = true)
 |-- UnitsSold: integer (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- UnitCost: double (nullable = true)
 |-- TotalRevenue: double (nullable = true)
 |-- TotalCost: double (nullable = true)
 |-- TotalProfit: double (nullable = true)

Out[1]: StructType(List(StructField(Region,StringType,true),StructField(Country,StringType,true),StructField(ItemType,StringType,true),StructField(SalesChannel,StringType,true),StructField(OrderPriority,StringType,true),StructField(OrderID,IntegerType,true),StructField(UnitsSold,IntegerType,true),StructField(UnitPrice,DoubleType,true),StructField(UnitCost,DoubleType,true),StructField(TotalRevenue,DoubleType,true),StructField(TotalCost,DoubleType,true),StructField(TotalProfit,DoubleType,true))

#Define schema Declaratively

In [0]:

orderSchema = 'Region String ,Country String ,ItemType String ,SalesChannel String ,OrderPriority String ,OrderID Integer ,UnitsSold Integer ,UnitPrice Double ,UnitCost Double ,TotalRevenue Double ,TotalCost Double ,TotalProfit Double'
                          
df = spark.read.load("/FileStore/tables/Order.csv",format="csv", header=True, schema=orderSchema)
df.printSchema()
#df.schema
#display(df)


root
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- ItemType: string (nullable = true)
 |-- SalesChannel: string (nullable = true)
 |-- OrderPriority: string (nullable = true)
 |-- OrderID: integer (nullable = true)
 |-- UnitsSold: integer (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- UnitCost: double (nullable = true)
 |-- TotalRevenue: double (nullable = true)
 |-- TotalCost: double (nullable = true)
 |-- TotalProfit: double (nullable = true)



Row object

In [0]:
df.collect()

Out[7]: [Row(Region='Middle East and North Africa', Country='Libya', ItemType='Cosmetics', SalesChannel='Offline', OrderPriority='M', OrderID=686800706, UnitsSold=8446, UnitPrice=437.2, UnitCost=263.33, TotalRevenue=3692591.2, TotalCost=2224085.18, TotalProfit=1468506.02),
 Row(Region='North America', Country='Canada', ItemType='Vegetables', SalesChannel='Online', OrderPriority='M', OrderID=185941302, UnitsSold=3018, UnitPrice=154.06, UnitCost=90.93, TotalRevenue=464953.08, TotalCost=274426.74, TotalProfit=190526.34),
 Row(Region='Middle East and North Africa', Country='Libya', ItemType='Baby Food', SalesChannel='Offline', OrderPriority='C', OrderID=246222341, UnitsSold=1517, UnitPrice=255.28, UnitCost=159.42, TotalRevenue=387259.76, TotalCost=241840.14, TotalProfit=145419.62),
 Row(Region='Asia', Country='Japan', ItemType='Cereal', SalesChannel='Offline', OrderPriority='C', OrderID=161442649, UnitsSold=3322, UnitPrice=205.7, UnitCost=117.11, TotalRevenue=683335.4, TotalCost=389039.42,

In [0]:
df.show()

+--------------------+----------+---------------+------------+-------------+---------+---------+---------+--------+------------+----------+-----------+
|              Region|   Country|       ItemType|SalesChannel|OrderPriority|  OrderID|UnitsSold|UnitPrice|UnitCost|TotalRevenue| TotalCost|TotalProfit|
+--------------------+----------+---------------+------------+-------------+---------+---------+---------+--------+------------+----------+-----------+
|Middle East and N...|     Libya|      Cosmetics|     Offline|            M|686800706|     8446|    437.2|  263.33|   3692591.2|2224085.18| 1468506.02|
|       North America|    Canada|     Vegetables|      Online|            M|185941302|     3018|   154.06|   90.93|   464953.08| 274426.74|  190526.34|
|Middle East and N...|     Libya|      Baby Food|     Offline|            C|246222341|     1517|   255.28|  159.42|   387259.76| 241840.14|  145419.62|
|                Asia|     Japan|         Cereal|     Offline|            C|161442649|  

# Columns in Spark Dataframe 
 in Scala
$"myColumn"
'myColumn

Columns are just expressions.<br>
Columns and transformations of those columns compile to the same logical plan as
parsed expressions.

In [0]:
from pyspark.sql.functions import col, column
col("someColumnName")
column("someColumnName")


[0;31m---------------------------------------------------------------------------[0m
[0;31mAttributeError[0m                            Traceback (most recent call last)
[0;32m<command-464658254375611>[0m in [0;36m<module>[0;34m[0m
[1;32m      2[0m [0mcol[0m[0;34m([0m[0;34m"someColumnName"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m      3[0m [0mcolumn[0m[0;34m([0m[0;34m"someColumnName"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 4[0;31m [0mdf[0m[0;34m.[0m[0mcol[0m[0;34m([0m[0;34m"count"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/sql/dataframe.py[0m in [0;36m__getattr__[0;34m(self, name)[0m
[1;32m   1664[0m         """
[1;32m   1665[0m         [0;32mif[0m [0mname[0m [0;32mnot[0m [0;32min[0m [0mself[0m[0;34m.[0m[0mcolumns[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0;32m-> 1666[0;31m             raise AttributeError(
[0m[1;32m   1667[0m                 "'%s' object has n

In [0]:
# To get list of columns

df.columns

Out[13]: ['Region',
 'Country',
 'ItemType',
 'SalesChannel',
 'OrderPriority',
 'OrderID',
 'UnitsSold',
 'UnitPrice',
 'UnitCost',
 'TotalRevenue',
 'TotalCost',
 'TotalProfit']

In [0]:
#To get the first row
df.first()

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
[0;32m<command-464658254375613>[0m in [0;36m<module>[0;34m[0m
[0;32m----> 1[0;31m [0mdf[0m[0;34m.[0m[0mfirst[0m[0;34m([0m[0;36m1[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;31mTypeError[0m: first() takes 1 positional argument but 2 were given

#Create Dataframe and Rows Manually

In [0]:
from pyspark.sql import Row
row1 = Row("Ram", None, 1, True)

# To access the specific attribute within the row use following:
#parallelizedRows = spark.sparkContext.parallelize(newRows)
row1[0]

# Create a dataframe
row1 = Row("Ram", None, 1, True)
myManualSchema = 'Name string, address string not null, id integer, exists string'
manDf = spark.createDataFrame([row1], myManualSchema)
manDf.show()


[0;31m---------------------------------------------------------------------------[0m
[0;31mValueError[0m                                Traceback (most recent call last)
[0;32m<command-464658254375614>[0m in [0;36m<module>[0;34m[0m
[1;32m      8[0m [0mrow1[0m [0;34m=[0m [0mRow[0m[0;34m([0m[0;34m"Ram"[0m[0;34m,[0m [0;32mNone[0m[0;34m,[0m [0;36m1[0m[0;34m,[0m [0;32mTrue[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m      9[0m [0mmyManualSchema[0m [0;34m=[0m [0;34m'Name string, address string not null, id integer, exists string'[0m[0;34m[0m[0;34m[0m[0m
[0;32m---> 10[0;31m [0mmanDf[0m [0;34m=[0m [0mspark[0m[0;34m.[0m[0mcreateDataFrame[0m[0;34m([0m[0;34m[[0m[0mrow1[0m[0;34m][0m[0;34m,[0m [0mmyManualSchema[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m     11[0m [0mmanDf[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m

[0;32m/databricks/spark/python/pyspark/sql/session.py[0m in [0;

# Select and SelectExpr (Select Expressions)

In [0]:
from pyspark.sql.functions import expr, col, column
df.select("Region", "Country").show(2)

#select using variety of ways
df.select(
expr("Country"),
col("Region"),
column("ItemType"),
df.OrderID)\
.show(2)

# Select using the alias
df.select(
expr("Country as NewCountry"),
col("Region").alias('New Region'),
column("ItemType"),
df.OrderID)\
.show(2)

#Use selectexpr

df.selectExpr("Country as NewCountry", "Region").show(2)

+--------------------+-------+
|              Region|Country|
+--------------------+-------+
|Middle East and N...|  Libya|
|       North America| Canada|
+--------------------+-------+
only showing top 2 rows

+-------+--------------------+----------+---------+
|Country|              Region|  ItemType|  OrderID|
+-------+--------------------+----------+---------+
|  Libya|Middle East and N...| Cosmetics|686800706|
| Canada|       North America|Vegetables|185941302|
+-------+--------------------+----------+---------+
only showing top 2 rows

+----------+--------------------+----------+---------+
|NewCountry|          New Region|  ItemType|  OrderID|
+----------+--------------------+----------+---------+
|     Libya|Middle East and N...| Cosmetics|686800706|
|    Canada|       North America|Vegetables|185941302|
+----------+--------------------+----------+---------+
only showing top 2 rows

+----------+--------------------+
|NewCountry|              Region|
+----------+-----------------

Converting to Spark Types (Literals)

In [0]:
# in Python
from pyspark.sql.functions import lit
df.select(expr("*"), lit(1).alias("NumberOne")).show(2)
df.select(expr("*"), lit(True).alias("MyResult")).show(2)
df.select(expr("*"), lit('Constant').alias("AddedColumn")).show(2)

+--------------------+-------+----------+------------+-------------+---------+---------+---------+--------+------------+----------+-----------+---------+
|              Region|Country|  ItemType|SalesChannel|OrderPriority|  OrderID|UnitsSold|UnitPrice|UnitCost|TotalRevenue| TotalCost|TotalProfit|NumberOne|
+--------------------+-------+----------+------------+-------------+---------+---------+---------+--------+------------+----------+-----------+---------+
|Middle East and N...|  Libya| Cosmetics|     Offline|            M|686800706|     8446|    437.2|  263.33|   3692591.2|2224085.18| 1468506.02|        1|
|       North America| Canada|Vegetables|      Online|            M|185941302|     3018|   154.06|   90.93|   464953.08| 274426.74|  190526.34|        1|
+--------------------+-------+----------+------------+-------------+---------+---------+---------+--------+------------+----------+-----------+---------+
only showing top 2 rows

+--------------------+-------+----------+----------

#Add column to dataframe

In [0]:
from pyspark.sql.functions import lit
newdf = df.withColumn("NewColumn", lit(1)).show(2)
newdf = df.withColumn("withinCountry", expr("Country == 'India'"))\

newdf.columns

+--------------------+-------+----------+------------+-------------+---------+---------+---------+--------+------------+----------+-----------+---------+
|              Region|Country|  ItemType|SalesChannel|OrderPriority|  OrderID|UnitsSold|UnitPrice|UnitCost|TotalRevenue| TotalCost|TotalProfit|NewColumn|
+--------------------+-------+----------+------------+-------------+---------+---------+---------+--------+------------+----------+-----------+---------+
|Middle East and N...|  Libya| Cosmetics|     Offline|            M|686800706|     8446|    437.2|  263.33|   3692591.2|2224085.18| 1468506.02|        1|
|       North America| Canada|Vegetables|      Online|            M|185941302|     3018|   154.06|   90.93|   464953.08| 274426.74|  190526.34|        1|
+--------------------+-------+----------+------------+-------------+---------+---------+---------+--------+------------+----------+-----------+---------+
only showing top 2 rows

Out[39]: ['Region',
 'Country',
 'ItemType',
 'Sale

#Renaming Columns

In [0]:
newdf = df.withColumnRenamed("Country", "NewCountry")
newdf.columns

Out[40]: ['Region',
 'NewCountry',
 'ItemType',
 'SalesChannel',
 'OrderPriority',
 'OrderID',
 'UnitsSold',
 'UnitPrice',
 'UnitCost',
 'TotalRevenue',
 'TotalCost',
 'TotalProfit']

In [0]:
# Use escape character when column name contains spaces (`)
newdf =df.withColumnRenamed("Country","New Column Name")
newdf.select("`New Column Name`").show()

+---------------+
|New Column Name|
+---------------+
|          Libya|
|         Canada|
|          Libya|
|          Japan|
|           Chad|
|        Armenia|
|        Eritrea|
|     Montenegro|
|        Jamaica|
|           Fiji|
|           Togo|
|     Montenegro|
|         Greece|
|          Sudan|
|       Maldives|
|     Montenegro|
|        Estonia|
|      Greenland|
|     Cape Verde|
|        Senegal|
+---------------+
only showing top 20 rows



d
 #Removing Columns
 
 Use comma to remove multiple columns

In [0]:
df.drop("Country").columns
df.drop("Country", "Region").columns

Out[48]: ['ItemType',
 'SalesChannel',
 'OrderPriority',
 'OrderID',
 'UnitsSold',
 'UnitPrice',
 'UnitCost',
 'TotalRevenue',
 'TotalCost',
 'TotalProfit']

Change Column Type

In [0]:
df.withColumn("UnitsSoldNew", col("UnitsSold").cast("double")).show(2)

+--------------------+-------+----------+------------+-------------+---------+---------+---------+--------+------------+----------+-----------+------------+
|              Region|Country|  ItemType|SalesChannel|OrderPriority|  OrderID|UnitsSold|UnitPrice|UnitCost|TotalRevenue| TotalCost|TotalProfit|UnitsSoldNew|
+--------------------+-------+----------+------------+-------------+---------+---------+---------+--------+------------+----------+-----------+------------+
|Middle East and N...|  Libya| Cosmetics|     Offline|            M|686800706|     8446|    437.2|  263.33|   3692591.2|2224085.18| 1468506.02|      8446.0|
|       North America| Canada|Vegetables|      Online|            M|185941302|     3018|   154.06|   90.93|   464953.08| 274426.74|  190526.34|      3018.0|
+--------------------+-------+----------+------------+-------------+---------+---------+---------+--------+------------+----------+-----------+------------+
only showing top 2 rows



#Filter Rows
To filter rows, we create an expression that evaluates to true or false. You then filter out the rows
with an expression that is equal to false.

In [0]:
df.filter(col("UnitsSold") < 1000).show(2)
df.where("UnitsSold < 1000").show(2)
df.filter(df.UnitsSold < 1000).show(2)


+--------------------+--------------------+--------+------------+-------------+---------+---------+---------+--------+------------+---------+-----------+
|              Region|             Country|ItemType|SalesChannel|OrderPriority|  OrderID|UnitsSold|UnitPrice|UnitCost|TotalRevenue|TotalCost|TotalProfit|
+--------------------+--------------------+--------+------------+-------------+---------+---------+---------+--------+------------+---------+-----------+
|Australia and Oce...|Federated States ...|  Snacks|      Online|            C|531023156|      407|   152.58|   97.44|    62100.06| 39658.08|   22441.98|
|                Asia|            Mongolia| Clothes|      Online|            L|770478332|      515|   109.28|   35.84|     56279.2|  18457.6|    37821.6|
+--------------------+--------------------+--------+------------+-------------+---------+---------+---------+--------+------------+---------+-----------+
only showing top 2 rows

+--------------------+--------------------+--------

# Unique Rows
extract the unique or distinct values in a DataFrame

In [0]:
df.select("Country", "Region").distinct().count()
df.select("Country").distinct().count()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-464658254375632>[0m in [0;36m<module>[0;34m[0m
[1;32m      1[0m [0mdf[0m[0;34m.[0m[0mselect[0m[0;34m([0m[0;34m"Country"[0m[0;34m,[0m [0;34m"Region"[0m[0;34m)[0m[0;34m.[0m[0mdistinct[0m[0;34m([0m[0;34m)[0m[0;34m.[0m[0mcount[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 2[0;31m [0mdf[0m[0;34m.[0m[0mselect[0m[0;34m([0m[0;34m"Country,online"[0m[0;34m)[0m[0;34m.[0m[0mdistinct[0m[0;34m([0m[0;34m)[0m[0;34m.[0m[0mcount[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/sql/dataframe.py[0m in [0;36mselect[0;34m(self, *cols)[0m
[1;32m   1690[0m         [0;34m[[0m[0mRow[0m[0;34m([0m[0mname[0m[0;34m=[0m[0;34m'Alice'[0m[0;34m,[0m [0mage[0m[0;34m=[0m[0;36m12[0m[0

#Union
DataFramess. To union two DataFrames, you must be sure that they have the same schema and
number of columns; otherwise, the union will fail.

In [0]:
df1 = df.filter("Country = 'Libya'")
df2 = df.filter("Country = 'Canada'")
df1.union(df2).show()

+--------------------+-------+----------+------------+-------------+---------+---------+---------+--------+------------+----------+-----------+
|              Region|Country|  ItemType|SalesChannel|OrderPriority|  OrderID|UnitsSold|UnitPrice|UnitCost|TotalRevenue| TotalCost|TotalProfit|
+--------------------+-------+----------+------------+-------------+---------+---------+---------+--------+------------+----------+-----------+
|Middle East and N...|  Libya| Cosmetics|     Offline|            M|686800706|     8446|    437.2|  263.33|   3692591.2|2224085.18| 1468506.02|
|Middle East and N...|  Libya| Baby Food|     Offline|            C|246222341|     1517|   255.28|  159.42|   387259.76| 241840.14|  145419.62|
|Middle East and N...|  Libya|    Cereal|     Offline|            M|964214932|     1480|    205.7|  117.11|    304436.0|  173322.8|   131113.2|
|Middle East and N...|  Libya| Baby Food|     Offline|            M|635122907|     5837|   255.28|  159.42|  1490069.36| 930534.54|  559

#Sorting
There are two equivalent operations to do this sort
and orderBy that work the exact same way. They accept both column expressions and strings as
well as multiple columns. The default is to sort in ascending order: <br>
You need to use the asc and desc functions if operating
on a column. These allow you to specify the order in which a given column should be sorted: <br>

use asc_nulls_first, desc_nulls_first, asc_nulls_last, or
desc_nulls_last to specify where you would like your null values to appear in an ordered
DataFrame.

In [0]:
df.sort("Country").show(5)
df.orderBy("Country", "UnitsSold").show(5)
df.orderBy(col("ItemType"), col("UnitPrice")).show(5)

#asc and desc function
from pyspark.sql.functions import desc, asc
df.orderBy(expr("Country desc")).show(2)
df.orderBy(col("Country").desc(), col("UnitPrice").asc()).show(2)

+--------------------+-----------+---------+------------+-------------+---------+---------+---------+--------+------------+---------+-----------+
|              Region|    Country| ItemType|SalesChannel|OrderPriority|  OrderID|UnitsSold|UnitPrice|UnitCost|TotalRevenue|TotalCost|TotalProfit|
+--------------------+-----------+---------+------------+-------------+---------+---------+---------+--------+------------+---------+-----------+
|Middle East and N...|Afghanistan|   Cereal|      Online|            M|410067975|     7081|    205.7|  117.11|   1456561.7|829255.91|  627305.79|
|Middle East and N...|Afghanistan|Baby Food|      Online|            M|767401731|       80|   255.28|  159.42|     20422.4|  12753.6|     7668.8|
|Middle East and N...|Afghanistan|  Clothes|      Online|            C|446991050|     3440|   109.28|   35.84|    375923.2| 123289.6|   252633.6|
|Middle East and N...|Afghanistan|   Cereal|     Offline|            C|808538234|     3286|    205.7|  117.11|    675930.2|3

#Limit
you
might want just the top ten of some DataFrame

In [0]:
df.limit(5).show()
df.orderBy(expr("Country desc")).limit(6).show()

+--------------------+-------+----------+------------+-------------+---------+---------+---------+--------+------------+----------+-----------+
|              Region|Country|  ItemType|SalesChannel|OrderPriority|  OrderID|UnitsSold|UnitPrice|UnitCost|TotalRevenue| TotalCost|TotalProfit|
+--------------------+-------+----------+------------+-------------+---------+---------+---------+--------+------------+----------+-----------+
|Middle East and N...|  Libya| Cosmetics|     Offline|            M|686800706|     8446|    437.2|  263.33|   3692591.2|2224085.18| 1468506.02|
|       North America| Canada|Vegetables|      Online|            M|185941302|     3018|   154.06|   90.93|   464953.08| 274426.74|  190526.34|
|Middle East and N...|  Libya| Baby Food|     Offline|            C|246222341|     1517|   255.28|  159.42|   387259.76| 241840.14|  145419.62|
|                Asia|  Japan|    Cereal|     Offline|            C|161442649|     3322|    205.7|  117.11|    683335.4| 389039.42|  294

#Repartition and Coalesce

Repartition will incur a full shuffle of the data, regardless of whether one is necessary. This
means that you should typically only repartition when the future number of partitions is greater
than your current number of partitions or when you are looking to partition by a set of columns: <br> <br>
Coalesce, on the other hand, will not incur a full shuffle and will try to combine partitions. This
operation will shuffle your data into five partitions based on the destination country name, and
then coalesce them (without a full shuffle)

In [0]:
df.rdd.getNumPartitions()
df.repartition(5)

#If you know that you’re going to be filtering by a certain column often, it can be worth repartitioning based on that column:
df.repartition(5, col("Country"))


df.repartition(5, col("Country")).coalesce(2)


Out[63]: 1

Use collect carefully

In [0]:
collectDF = df.limit(10)
collectDF.take(5) # take works with an Integer count
collectDF.show() # this prints it out nicely
collectDF.show(5, False)
collectDF.collect()

+--------------------+----------+----------+------------+-------------+---------+---------+---------+--------+------------+----------+-----------+
|              Region|   Country|  ItemType|SalesChannel|OrderPriority|  OrderID|UnitsSold|UnitPrice|UnitCost|TotalRevenue| TotalCost|TotalProfit|
+--------------------+----------+----------+------------+-------------+---------+---------+---------+--------+------------+----------+-----------+
|Middle East and N...|     Libya| Cosmetics|     Offline|            M|686800706|     8446|    437.2|  263.33|   3692591.2|2224085.18| 1468506.02|
|       North America|    Canada|Vegetables|      Online|            M|185941302|     3018|   154.06|   90.93|   464953.08| 274426.74|  190526.34|
|Middle East and N...|     Libya| Baby Food|     Offline|            C|246222341|     1517|   255.28|  159.42|   387259.76| 241840.14|  145419.62|
|                Asia|     Japan|    Cereal|     Offline|            C|161442649|     3322|    205.7|  117.11|    6833

Describe to get statistics

In [0]:
display(df.describe())

summary,Region,Country,ItemType,SalesChannel,OrderPriority,OrderID,UnitsSold,UnitPrice,UnitCost,TotalRevenue,TotalCost,TotalProfit
count,1000,1000,1000,1000,1000,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0
mean,,,,,,549681324.743,5053.988,262.1068400000004,184.96510999999992,1327321.8403300012,936119.2287700002,391202.6115600001
stddev,,,,,,257133358.83775103,2901.375316739389,216.0210604630438,175.2893114559429,1486514.564803918,1162570.7525582854,383640.1858646797
min,Asia,Afghanistan,Baby Food,Offline,C,102928006.0,13.0,9.33,6.92,2043.25,1416.75,532.61
max,Sub-Saharan Africa,Zimbabwe,Vegetables,Online,M,995529830.0,9998.0,668.27,524.96,6617209.54,5204978.4,1726181.36


#String functions

In [0]:
from pyspark.sql.functions import initcap
from pyspark.sql.functions import lower, upper

#Translate the first letter of each word to upper case in the sentence.
df.select(initcap(col("Country"))).show()
df.select(lower(col("Country"))).show()
df.select(upper(col("Country"))).show()

#To concat 
display(df.select(concat(col("Region"), col("Country"))))

#To concat with separater
display(df.select(concat_ws('|',col("Region"), col("Country"))))


#instr(str, substr) - Returns the (1-based) index of the first occurrence of substr in str
display(df.select(instr(col("Region"),"Mi")))

#length(expr) - Returns the character length of string data or number of bytes of binary data
display(df.select(length(col("Region"))))

from pyspark.sql.functions import lit, ltrim, rtrim, rpad, lpad, trim
df.select(
ltrim(lit(" HELLO ")).alias("ltrim"),
rtrim(lit(" HELLO ")).alias("rtrim"),
trim(lit(" HELLO ")).alias("trim"),
lpad(lit("HELLO"), 3, " ").alias("lp"),
rpad(lit("HELLO"), 10, " ").alias("rp")).show(2)

# Regular Expression
#from pyspark.sql.functions import regexp_replace
#regex_string = "Hello|WHITE|RED|GREEN|BLUE"
#df.select(regexp_replace(col("Country"), regex_string, "COLOR").alias("color_clean"),col("Description")).show(2)


+----------------+
|initcap(Country)|
+----------------+
|           Libya|
|          Canada|
|           Libya|
|           Japan|
|            Chad|
|         Armenia|
|         Eritrea|
|      Montenegro|
|         Jamaica|
|            Fiji|
|            Togo|
|      Montenegro|
|          Greece|
|           Sudan|
|        Maldives|
|      Montenegro|
|         Estonia|
|       Greenland|
|      Cape Verde|
|         Senegal|
+----------------+
only showing top 20 rows

+--------------+
|lower(Country)|
+--------------+
|         libya|
|        canada|
|         libya|
|         japan|
|          chad|
|       armenia|
|       eritrea|
|    montenegro|
|       jamaica|
|          fiji|
|          togo|
|    montenegro|
|        greece|
|         sudan|
|      maldives|
|    montenegro|
|       estonia|
|     greenland|
|    cape verde|
|       senegal|
+--------------+
only showing top 20 rows

+--------------+
|upper(Country)|
+--------------+
|         LIBYA|
|        CANADA|

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-464658254375646>[0m in [0;36m<module>[0;34m[0m
[1;32m     17[0m [0;32mfrom[0m [0mpyspark[0m[0;34m.[0m[0msql[0m[0;34m.[0m[0mfunctions[0m [0;32mimport[0m [0mregexp_replace[0m[0;34m[0m[0;34m[0m[0m
[1;32m     18[0m [0mregex_string[0m [0;34m=[0m [0;34m"BLACK|WHITE|RED|GREEN|BLUE"[0m[0;34m[0m[0;34m[0m[0m
[0;32m---> 19[0;31m df.select(
[0m[1;32m     20[0m [0mregexp_replace[0m[0;34m([0m[0mcol[0m[0;34m([0m[0;34m"Description"[0m[0;34m)[0m[0;34m,[0m [0mregex_string[0m[0;34m,[0m [0;34m"COLOR"[0m[0;34m)[0m[0;34m.[0m[0malias[0m[0;34m([0m[0;34m"color_clean"[0m[0;34m)[0m[0;34m,[0m[0;34m[0m[0;34m[0m[0m
[1;32m     21[0m col("Description")).show(2)

[0;32m/databricks/spark/python/pyspark/sql/dataframe.py[0m in [0;36mselect[0;34m(

#Date Handling in Spark

Spark will not throw an error if it cannot parse the date; rather, it will just return null.

In [0]:
from pyspark.sql.functions import current_date, current_timestamp
dateDF = spark.range(10)\
.withColumn("today", current_date())\
.withColumn("now", current_timestamp())

display(dateDF)

# Add Subtract dates
from pyspark.sql.functions import date_add, date_sub
dateDF.select(date_sub(col("today"), 5), date_add(col("today"), 5)).show(1)

#Days and Month difference between dates
from pyspark.sql.functions import datediff, months_between, to_date
dateDF.withColumn("week_ago", date_sub(col("today"), 7))\
.select(datediff(col("week_ago"), col("today"))).show(1)

dateDF.select(
to_date(lit("2016-01-01")).alias("start"),
to_date(lit("2017-05-22")).alias("end"))\
.select(months_between(col("start"), col("end"))).show(1)

#incorrect date format
dateDF.select(to_date(lit("2016-20-12")),to_date(lit("2017-12-11"))).show(1)

#Give date format
from pyspark.sql.functions import to_date
dateFormat = "yyyy-dd-MM"
cleanDateDF = spark.range(1).select(
to_date(lit("2017-12-11"), dateFormat).alias("date"),
to_date(lit("2017-20-12"), dateFormat).alias("date2"))
cleanDateDF.show()

#Handle timestamp to date  format casting
from pyspark.sql.functions import to_timestamp, year, month, dayofmonth, hour, minute, second
cleanDateDF.select(to_timestamp(col("date"), dateFormat)).show()

#get year using the year function with date and timepstamp
cleanDateDF.select(year(to_timestamp(col("date"), dateFormat))).show()

#get month using the month function with date and timepstamp
cleanDateDF.select(month(to_timestamp(col("date"), dateFormat))).show()

#get dayofmonth using the dayofmonth function with date and timepstamp
cleanDateDF.select(dayofmonth(to_timestamp(col("date"), dateFormat))).show()


#get hour using the hour function with date and timepstamp
cleanDateDF.select(hour(to_timestamp(col("date"), dateFormat))).show()


#get minute using the minute function with date and timepstamp
cleanDateDF.select(minute(to_timestamp(col("date"), dateFormat))).show()


#get second using the second function with date and timepstamp
cleanDateDF.select(second(to_timestamp(col("date"), dateFormat))).show()


+------------------+------------------+
|date_sub(today, 5)|date_add(today, 5)|
+------------------+------------------+
|        2022-03-04|        2022-03-14|
+------------------+------------------+
only showing top 1 row

+-------------------------+
|datediff(week_ago, today)|
+-------------------------+
|                       -7|
+-------------------------+
only showing top 1 row

+--------------------------------+
|months_between(start, end, true)|
+--------------------------------+
|                    -16.67741935|
+--------------------------------+
only showing top 1 row

+-------------------+-------------------+
|to_date(2016-20-12)|to_date(2017-12-11)|
+-------------------+-------------------+
|               null|         2017-12-11|
+-------------------+-------------------+
only showing top 1 row

+----------+----------+
|      date|     date2|
+----------+----------+
|2017-11-12|2017-12-20|
+----------+----------+

+------------------------------+
|to_timestamp(date, yyyy-

id,today,now
0,2022-03-09,2022-03-09T14:35:42.585+0000
1,2022-03-09,2022-03-09T14:35:42.585+0000
2,2022-03-09,2022-03-09T14:35:42.585+0000
3,2022-03-09,2022-03-09T14:35:42.585+0000
4,2022-03-09,2022-03-09T14:35:42.585+0000
5,2022-03-09,2022-03-09T14:35:42.585+0000
6,2022-03-09,2022-03-09T14:35:42.585+0000
7,2022-03-09,2022-03-09T14:35:42.585+0000
8,2022-03-09,2022-03-09T14:35:42.585+0000
9,2022-03-09,2022-03-09T14:35:42.585+0000


#Coalesce

Spark includes a function to allow you to select the first non-null value from a set of columns by
using the coalesce function.

In [0]:
from pyspark.sql.functions import coalesce
df.select(coalesce(col("Description"), col("CustomerId"))).show()

#ifnull, nullIf, nvl, and nvl2

There are several other SQL functions that you can use to achieve similar things. ifnull allows
you to select the second value if the first is null, and defaults to the first. Alternatively, you could
use nullif, which returns null if the two values are equal or else returns the second if they are
not. nvl returns the second value if the first is null, but defaults to the first. Finally, nvl2 returns
the second value if the first is not null; otherwise, it will return the last specified value

In [0]:
%sql
/*SELECT
ifnull(null, 'return_value'),
nullif('value', 'value'),
nvl(null, 'return_value'),
nvl2('not_null', 'return_value', "else_value")
FROM dfTable LIMIT 1*/

#Drop
The simplest function is drop, which removes rows that contain nulls. The default is to drop any
row in which any value is null:

In [0]:
df.na.drop()

#Specifying "any" as an argument drops a row if any of the values are null. Using “all” drops the row only if all values are null or NaN for that row:
df.na.drop("any")
df.na.drop("all")

#We can also apply this to certain sets of columns by passing in an array of columns:
df.na.drop("all", subset=["Country", "Region"])


Out[82]: DataFrame[Region: string, Country: string, ItemType: string, SalesChannel: string, OrderPriority: string, OrderID: int, UnitsSold: int, UnitPrice: double, UnitCost: double, TotalRevenue: double, TotalCost: double, TotalProfit: double]

#Fill 

Using the fill function, you can fill one or more columns with a set of values. This can be done
by specifying a map—that is a particular value and a set of columns.

In [0]:
#For example, to fill all null values in columns of type String, you might specify the following:
df.na.fill("All Null values become this string")
df.na.fill("all", subset=["Country", "Region"])

fill_cols_vals = {"UnitsSold": 5, "Region" : "No Value"}
df.na.fill(fill_cols_vals)


#Split column to array

In [0]:
from pyspark.sql.functions import split
df.select(split(col("Region"), " ")).show(2)

df.select(split(col("Region"), " ").alias("Region_Array"))\
.selectExpr("Region_Array[0]").show(2)


+--------------------+
|split(Region,  , -1)|
+--------------------+
|[Middle, East, an...|
|    [North, America]|
+--------------------+
only showing top 2 rows

+---------------+
|Region_Array[0]|
+---------------+
|         Middle|
|          North|
+---------------+
only showing top 2 rows



Array Size

In [0]:
# in Python
from pyspark.sql.functions import size
df.select(size(split(col("Region"), " "))).show(2) # shows 5 and 3

#Array contains
from pyspark.sql.functions import array_contains
df.select(array_contains(split(col("Region"), " "), "North")).show(2)

+--------------------------+
|size(split(Region,  , -1))|
+--------------------------+
|                         5|
|                         2|
+--------------------------+
only showing top 2 rows

+-------------------------------------------+
|array_contains(split(Region,  , -1), North)|
+-------------------------------------------+
|                                       true|
|                                       true|
+-------------------------------------------+
only showing top 2 rows



In [0]:
#Where NotNul function
from pyspark.sql.functions import *
df2 = df.select("Region", "Country", "UnitsSold").where(df.Country == "Libya")
df2.show()
df2 = df.select("Region", "Country", df["UnitsSold"]+1).where(df.Country == "Libya")
df2.show()
df2.show(5, truncate=False)
df2 = df.select("Region", "Country", df["UnitsSold"].isNotNull()).where(df.Country == "Libya")



+--------------------+-------+---------+
|              Region|Country|UnitsSold|
+--------------------+-------+---------+
|Middle East and N...|  Libya|     8446|
|Middle East and N...|  Libya|     1517|
|Middle East and N...|  Libya|     1480|
|Middle East and N...|  Libya|     5837|
|Middle East and N...|  Libya|       64|
|Middle East and N...|  Libya|     4550|
+--------------------+-------+---------+

+--------------------+-------+---------------+
|              Region|Country|(UnitsSold + 1)|
+--------------------+-------+---------------+
|Middle East and N...|  Libya|           8447|
|Middle East and N...|  Libya|           1518|
|Middle East and N...|  Libya|           1481|
|Middle East and N...|  Libya|           5838|
|Middle East and N...|  Libya|             65|
|Middle East and N...|  Libya|           4551|
+--------------------+-------+---------------+

+----------------------------+-------+---------------+
|Region                      |Country|(UnitsSold + 1)|
+-------

In [0]:
#df2= df.filter((df.Country  == "Libya") |  (df.Country  == "Japan"))
#df2.show()
df.createOrReplaceTempView("Order")
df2 = spark.sql ("select * from Order where Country in ('Libya', 'Japan')")
df2.show()

+--------------------+-------+-------------+------------+-------------+---------+---------+---------+--------+------------+----------+-----------+
|              Region|Country|     ItemType|SalesChannel|OrderPriority|  OrderID|UnitsSold|UnitPrice|UnitCost|TotalRevenue| TotalCost|TotalProfit|
+--------------------+-------+-------------+------------+-------------+---------+---------+---------+--------+------------+----------+-----------+
|Middle East and N...|  Libya|    Cosmetics|     Offline|            M|686800706|     8446|    437.2|  263.33|   3692591.2|2224085.18| 1468506.02|
|Middle East and N...|  Libya|    Baby Food|     Offline|            C|246222341|     1517|   255.28|  159.42|   387259.76| 241840.14|  145419.62|
|                Asia|  Japan|       Cereal|     Offline|            C|161442649|     3322|    205.7|  117.11|    683335.4| 389039.42|  294295.98|
|Middle East and N...|  Libya|       Cereal|     Offline|            M|964214932|     1480|    205.7|  117.11|    3044

#explode

The explode function takes a column that consists of arrays and creates one row (with the rest of
the values duplicated) per value in the array

In [0]:
from pyspark.sql.functions import split, explode
df.withColumn("splitted", split(col("Region"), " "))\
.withColumn("exploded", explode(col("splitted")))\
.select("Region", "Country", "exploded").show(2)

+--------------------+-------+--------+
|              Region|Country|exploded|
+--------------------+-------+--------+
|Middle East and N...|  Libya|  Middle|
|Middle East and N...|  Libya|    East|
+--------------------+-------+--------+
only showing top 2 rows



Maps
Maps are created by using the map function and key-value pairs of columns. You then can select
them just like you might select from an array:

In [0]:
from pyspark.sql.functions import create_map
df.select(create_map(col("Region"), col("Country")).alias("complex_map"))\
.show(2)

#query map
#df.select(map(col("Region"), col("Country")).alias("complex_map"))\
#.selectExpr("complex_map['Middle']").show(2)


+--------------------+
|         complex_map|
+--------------------+
|{Middle East and ...|
|{North America ->...|
+--------------------+
only showing top 2 rows



[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
[0;32m<command-4300003863290542>[0m in [0;36m<module>[0;34m[0m
[1;32m      4[0m [0;34m[0m[0m
[1;32m      5[0m [0;31m#query map[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 6[0;31m [0mdf[0m[0;34m.[0m[0mselect[0m[0;34m([0m[0mmap[0m[0;34m([0m[0mcol[0m[0;34m([0m[0;34m"Region"[0m[0;34m)[0m[0;34m,[0m [0mcol[0m[0;34m([0m[0;34m"Country"[0m[0;34m)[0m[0;34m)[0m[0;34m.[0m[0malias[0m[0;34m([0m[0;34m"complex_map"[0m[0;34m)[0m[0;34m)[0m[0;31m\[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      7[0m [0;34m.[0m[0mselectExpr[0m[0;34m([0m[0;34m"complex_map['Middle']"[0m[0;34m)[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0;36m2[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m

[0;32m/databricks/spark/python/pyspark/sql/column.py[0m in [0;36m__iter__[0;34m(self)[0m


#Aggregate Function

In [0]:
# in Python
from pyspark.sql.functions import count, countDistinct
df.select(count("Region")).show() 

#countDistinct
df.select(countDistinct("Region")).show()
#-- in SQL
#SELECT COUNT(DISTINCT *) FROM DFTABLE

#first and last
#You can get the first and last values from a DataFrame by using these two obviously named functions.

# in Python
from pyspark.sql.functions import first, last
df.select(first("Region"), last("Region")).show()

#min and max
#To extract the minimum and maximum values from a DataFrame, use the min and max functions:

from pyspark.sql.functions import min, max
df.select(min("UnitsSold"), max("UnitsSold")).show()

#sum
#Another simple task is to add all the values in a row using the sum function:
from pyspark.sql.functions import sum
df.select(sum("UnitsSold")).show()

#sumDistinct
#In addition to summing a total, you also can sum a distinct set of values by using the sumDistinct function:
from pyspark.sql.functions import sumDistinct
df.select(sumDistinct("UnitsSold")).show() 

#avg

from pyspark.sql.functions import avg
df.select(avg("UnitsSold")).show() 



#Variance and Standard Deviation for ppulation and sample
from pyspark.sql.functions import var_pop, stddev_pop
from pyspark.sql.functions import var_samp, stddev_samp
df.select(var_pop("Quantity"), var_samp("Quantity"),
stddev_pop("Quantity"), stddev_samp("Quantity")).show()

+-------------+
|count(Region)|
+-------------+
|         1000|
+-------------+

+----------------------+
|count(DISTINCT Region)|
+----------------------+
|                     7|
+----------------------+

+--------------------+------------+
|       first(Region)|last(Region)|
+--------------------+------------+
|Middle East and N...|      Europe|
+--------------------+------------+

+--------------+--------------+
|min(UnitsSold)|max(UnitsSold)|
+--------------+--------------+
|            13|          9998|
+--------------+--------------+

+--------------+
|sum(UnitsSold)|
+--------------+
|       5053988|
+--------------+

+-----------------------+
|sum(DISTINCT UnitsSold)|
+-----------------------+
|                4856458|
+-----------------------+

+--------------+
|avg(UnitsSold)|
+--------------+
|      5053.988|
+--------------+



#Grouping

 this returns another DataFrame and is lazily performed. We do this grouping in two phases. First we specify the column(s) on which we would like to
group, and then we specify the aggregation(s). The first step returns a
RelationalGroupedDataset, and the second step returns a DataFrame.

In [0]:
df.groupBy("Region", "Country").count().show()

#-- in SQL
#SELECT count(*) FROM dfTable GROUP BY InvoiceNo, CustomerId

# in Python
#Rather than passing that function as an expression into a select statement, we specify it as within agg. This makes it possible for you to pass-in arbitrary expressions that just need to have some aggregation specified. You can even do things like alias a column after transforming it for later use in your data flow:

from pyspark.sql.functions import count
df.groupBy("Region").agg(
count("UnitsSold").alias("quan"),
expr("count(UnitsSold)")).show()


+--------------------+--------------------+-----+
|              Region|             Country|count|
+--------------------+--------------------+-----+
|                Asia|               Japan|    8|
|Middle East and N...|             Morocco|    5|
|  Sub-Saharan Africa|            Ethiopia|    5|
|              Europe|              Russia|    7|
|Middle East and N...|                Iran|    8|
|Central America a...|                Cuba|   11|
|  Sub-Saharan Africa|         Seychelles |    5|
|  Sub-Saharan Africa|            Tanzania|    4|
|  Sub-Saharan Africa|               Gabon|    4|
|                Asia|           Singapore|    3|
|  Sub-Saharan Africa|              Zambia|    2|
|  Sub-Saharan Africa|Central African R...|    7|
|  Sub-Saharan Africa|         South Sudan|    8|
|              Europe|             Belgium|    7|
|              Europe|              Kosovo|    1|
|              Europe|          San Marino|    2|
|              Europe|      United Kingdom|    5|


#Joins

In [0]:
#Inner joins are the default join, so we just need to specify our left DataFrame and join the right in the JOIN expression:

person = spark.createDataFrame([
(0, "Bill Chambers", 0, [100]),
(1, "Matei Zaharia", 1, [500, 250, 100]),
(2, "Michael Armbrust", 1, [250, 100])])\
.toDF("id", "name", "graduate_program", "spark_status")
graduateProgram = spark.createDataFrame([
(0, "Masters", "School of Information", "UC Berkeley"),
(2, "Masters", "EECS", "UC Berkeley"),
(1, "Ph.D.", "EECS", "UC Berkeley")])\
.toDF("id", "degree", "department", "school")
sparkStatus = spark.createDataFrame([
(500, "Vice President"),
(250, "PMC Member"),
(100, "Contributor")])\
.toDF("id", "status")

joinExpression = person["graduate_program"] == graduateProgram['id']
person.join(graduateProgram, joinExpression).show()


# specify the join type
joinType = "inner"
person.join(graduateProgram, joinExpression, joinType).show()

#Outer Joins
#Outer joins evaluate the keys in both of the DataFrames or tables and includes (and joins together) the rows that evaluate to true or false. If there is no equivalent row in either the left or right DataFrame, Spark will insert null:

joinType = "outer"
person.join(graduateProgram, joinExpression, joinType).show()

#Left Outer Joins
#Left outer joins evaluate the keys in both of the DataFrames or tables and includes all rows from the left DataFrame as well as any rows in the right DataFrame that have a match in the left DataFrame. If there is no equivalent row in the right DataFrame, Spark will insert null:

joinType = "left_outer"
graduateProgram.join(person, joinExpression, joinType).show()


#Right Outer Joins
#Right outer joins evaluate the keys in both of the DataFrames or tables and includes all rows from the right DataFrame as well as any rows in the left DataFrame that have a match in the right DataFrame. If there is no equivalent row in the left DataFrame, Spark will insert null:

joinType = "right_outer"
person.join(graduateProgram, joinExpression, joinType).show()

#Natural Joins
#Natural joins make implicit guesses at the columns on which you would like to join. It finds matching columns and returns the results. Left, right, and outer natural joins are all supported.

#-- in SQL
#SELECT * FROM graduateProgram NATURAL JOIN person

#Cross (Cartesian) Joins
#The last of our joins are cross-joins or cartesian products. Cross-joins in simplest terms are inner joins that do not specify a predicate. Cross joins will join every single row in the left DataFrame to ever single row in the right DataFrame. This will cause an absolute explosion in the number of rows contained in the resulting DataFrame. If you have 1,000 rows in each DataFrame, the crossjoin of these will result in 1,000,000 (1,000 x 1,000) rows. For this reason, you must very explicitly state that you want a cross-join by using the cross join keyword:

joinType = "cross"
graduateProgram.join(person, joinExpression, joinType).show()

#-- in SQL
#SELECT * FROM graduateProgram CROSS JOIN person
#ON graduateProgram.id = person.graduate_program

#If you truly intend to have a cross-join, you can call that out explicitly:
person.crossJoin(graduateProgram).show()

#Write to dataframe <br>

append::           Appends the output files to the list of files that already exist at that location<br>
overwrite::         Will completely overwrite any data that already exists there<br>
errorIfExists::     Throws an error and fails the write if data or files already exist at the specified location<br>
ignore::            If data or files exist at the location, do nothing with the current DataFrame<br>

In [0]:
dataframe.write.format("csv")
.option("mode", "OVERWRITE")
.option("dateFormat", "yyyy-MM-dd")
.option("path", "path/to/file(s)")
.save()




Parquet Format

Parquet Files
Parquet is an open source column-oriented data store that provides a variety of storage
optimizations, especially for analytics workloads. It provides columnar compression, which
saves storage space and allows for reading individual columns instead of entire files. It is a file
format that works exceptionally well with Apache Spark and is in fact the default file format. We
recommend writing data out to Parquet for long-term storage because reading from a Parquet file
will always be more efficient than JSON or CSV. Another advantage of Parquet is that it
supports complex types. This means that if your column is an array (which would fail with a
CSV file, for example), map, or struct, you’ll still be able to read and write that file without
issue. Here’s how to specify Parquet as the read format:

In [0]:
spark.read.format("parquet")\
.load("/data/flight-data/parquet/2010-summary.parquet").show(5)

csvFile.write.format("parquet").mode("overwrite")\
.save("/tmp/my-parquet-file.parquet")

#Explain

In [0]:
dbDataFrame.explain()

In [0]:
#Filter Functions
df2= df.filter(df.Country != "Libya")
df2.show()

#Multiple Condition
df2= df.filter((df.Country  == "Libya") |  (df.Country  == "Japan"))
df2.show()

#List of values filter
li=["Libya","Japan"]
df.filter(df.Country.isin(li)).show()

#Like Filter
df.filter(df.Country.like("%J%")).show()

#Regular Expression Filter
df.filter(df.Country.rlike("(?i)^*L$")).show()

#Assuming that one of the column in dataframe is Array then You can run filter using the below code
#from pyspark.sql.functions import array_contains
#df.filter(array_contains(df.phoneNumbers,"123")).show()




+--------------------+--------------------+---------------+------------+-------------+---------+---------+---------+--------+------------+----------+-----------+
|              Region|             Country|       ItemType|SalesChannel|OrderPriority|  OrderID|UnitsSold|UnitPrice|UnitCost|TotalRevenue| TotalCost|TotalProfit|
+--------------------+--------------------+---------------+------------+-------------+---------+---------+---------+--------+------------+----------+-----------+
|       North America|              Canada|     Vegetables|      Online|            M|185941302|     3018|   154.06|   90.93|   464953.08| 274426.74|  190526.34|
|                Asia|               Japan|         Cereal|     Offline|            C|161442649|     3322|    205.7|  117.11|    683335.4| 389039.42|  294295.98|
|  Sub-Saharan Africa|                Chad|         Fruits|     Offline|            H|645713555|     9845|     9.33|    6.92|    91853.85|   68127.4|   23726.45|
|              Europe|      

In [0]:
df2 =df.withColumn("UnitSoldNew",df.UnitsSold+1)
df3 = df2.select("UnitSoldNew")
df4 = df3.drop()
df4.show()



+-----------+
|UnitSoldNew|
+-----------+
|       8447|
|       3019|
|       1518|
|       3323|
|       9846|
|       9529|
|       2845|
|       7300|
|       2429|
|       4801|
|       3013|
|       2695|
|       1509|
|       4147|
|       7333|
|       4821|
|       2398|
|       2881|
|       1118|
|       8990|
+-----------+
only showing top 20 rows



In [0]:
#Add Columns
df2 =df.withColumn("UnitSoldNew",df.UnitsSold+1)
df2.show()

# Column Rename
df2 = df2.withColumnRenamed("UnitSoldNew","UnitSoldRenamed")
df2.show()

#Column Drop
df2 = df2.drop("UnitSoldRenamed")
#or
df2 = df2.drop(df2.UnitsSold)
df2.show()

#Change DataType of Column
df2 = df.withColumn("UnitsSold",df.UnitsSold.cast("Double")).show()
df2.printSchema()

+--------------------+----------+---------------+------------+-------------+---------+---------+---------+--------+------------+----------+-----------+-----------+
|              Region|   Country|       ItemType|SalesChannel|OrderPriority|  OrderID|UnitsSold|UnitPrice|UnitCost|TotalRevenue| TotalCost|TotalProfit|UnitSoldNew|
+--------------------+----------+---------------+------------+-------------+---------+---------+---------+--------+------------+----------+-----------+-----------+
|Middle East and N...|     Libya|      Cosmetics|     Offline|            M|686800706|     8446|    437.2|  263.33|   3692591.2|2224085.18| 1468506.02|       8447|
|       North America|    Canada|     Vegetables|      Online|            M|185941302|     3018|   154.06|   90.93|   464953.08| 274426.74|  190526.34|       3019|
|Middle East and N...|     Libya|      Baby Food|     Offline|            C|246222341|     1517|   255.28|  159.42|   387259.76| 241840.14|  145419.62|       1518|
|               

[0;31m---------------------------------------------------------------------------[0m
[0;31mAttributeError[0m                            Traceback (most recent call last)
[0;32m<command-1177475708183475>[0m in [0;36m<module>[0;34m[0m
[1;32m     15[0m [0;31m#Change DataType of Column[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[1;32m     16[0m [0mdf2[0m [0;34m=[0m [0mdf[0m[0;34m.[0m[0mwithColumn[0m[0;34m([0m[0;34m"UnitsSold"[0m[0;34m,[0m[0mdf[0m[0;34m.[0m[0mUnitsSold[0m[0;34m.[0m[0mcast[0m[0;34m([0m[0;34m"Double"[0m[0;34m)[0m[0;34m)[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;32m---> 17[0;31m [0mdf2[0m[0;34m.[0m[0mprintSchema[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;31mAttributeError[0m: 'NoneType' object has no attribute 'printSchema'

In [0]:
#Show Function Variation
df.show(n=3,truncate=25,vertical=True)
df.show()
df.show(10)
df.show(truncate=False)

#Usually, collect() is used to retrieve the action output when you have very small result set and calling collect() on an RDD/DataFrame with a bigger result set causes out of memory as it returns the entire dataset (from all workers) to the driver hence we should avoid calling collect() on a larger dataset.


-RECORD 0----------------------------------
 Region        | Middle East and North ... 
 Country       | Libya                     
 ItemType      | Cosmetics                 
 SalesChannel  | Offline                   
 OrderPriority | M                         
 OrderID       | 686800706                 
 UnitsSold     | 8446                      
 UnitPrice     | 437.2                     
 UnitCost      | 263.33                    
 TotalRevenue  | 3692591.2                 
 TotalCost     | 2224085.18                
 TotalProfit   | 1468506.02                
-RECORD 1----------------------------------
 Region        | North America             
 Country       | Canada                    
 ItemType      | Vegetables                
 SalesChannel  | Online                    
 OrderPriority | M                         
 OrderID       | 185941302                 
 UnitsSold     | 3018                      
 UnitPrice     | 154.06                    
 UnitCost      | 90.93          

In [0]:
#String Functions

from pyspark.sql import functions as F

df2 =df.select("Region","Country",F.when(df.UnitsSold > 2000, 1).otherwise(0))
df2.show()

#check isin
df2 = df[df.Country.isin("Libiya" ,"Japan")]
df2.show()

#Like
df2 = df.select("Region","Country", df.Country.like("L" ))
df2.show()

#StartsWith
df2 = df.select("Region","Country", df.Country.startswith("L"))
df2.show()
 
df2 = df.select("Region","Country", df.Country.endswith("L"))
df2.show()





In [0]:
#Missing Data

#Replace null with given value for the list of columns
df2 = df.fillna(value=0,subset=["Country"])
df2.show()

#Another way to replace
df2= df.na.fill(value=0, subset=["Country"])
df2.show()

#Replace null for all columns
df2 = df.na.fill(0)
df2.show()


#drop all null rows
df2 = df.na.drop()
df2.show() 

#replace value
df2 = df.na.replace(10, 20)

 .show() 



In [0]:
#Distinct / Remove Duplicate
df2 = df.dropDuplicates()  
dropDisDF = df.dropDuplicates(["Country","UnitsSold"])
df2 =df.distinct()



In [0]:
#Sorting and Order By
df.sort("Country","Region").show(truncate=False)
df.sort(df.Country,df.Region).show(truncate=False)

#Ascending / descending
df.sort(df.Country.asc(),df.Region.desc()).show(truncate=False)

#Order bY function is also works same as sort
df.orderBy(df.Country.asc(),df.Region.desc()).show(truncate=False)




In [0]:
from pyspark.sql.functions import col,sum,avg,max
#group by
df.groupBy(df.Country).count().show()

#Multiple column grouping
df.groupBy(df.Country, df.Region).sum("UnitsSold").show()

#Multiple aggregation
df.groupBy(df.Country, df.Region).sum("UnitsSold","UnitCost").show()

#Multiple Different Type of aggregation
df.printSchema()
df.groupBy(df.Country, df.Region).sum("UnitsSold").where(col("sum(UnitsSold)")>1000).show()




In [0]:
%fs
ls 

path,name,size
dbfs:/FileStore/,FileStore/,0
dbfs:/databricks-datasets/,databricks-datasets/,0
dbfs:/databricks-results/,databricks-results/,0
dbfs:/order_parquet_format/,order_parquet_format/,0
dbfs:/tmp/,tmp/,0
dbfs:/user/,user/,0


In [0]:
%sql
--CREATE DATABASE sample

select * from OrderTable18Dec

Region,Country,ItemType,SalesChannel,OrderPriority,OrderID,UnitsSold,UnitPrice,UnitCost,TotalRevenue,TotalCost,TotalProfit
Middle East and North Africa,Libya,Cosmetics,Offline,M,686800706,8446,437.2,263.33,3692591.2,2224085.18,1468506.02
North America,Canada,Vegetables,Online,M,185941302,3018,154.06,90.93,464953.08,274426.74,190526.34
Middle East and North Africa,Libya,Baby Food,Offline,C,246222341,1517,255.28,159.42,387259.76,241840.14,145419.62
Asia,Japan,Cereal,Offline,C,161442649,3322,205.7,117.11,683335.4,389039.42,294295.98
Sub-Saharan Africa,Chad,Fruits,Offline,H,645713555,9845,9.33,6.92,91853.85,68127.4,23726.45
Europe,Armenia,Cereal,Online,H,683458888,9528,205.7,117.11,1959909.6,1115824.08,844085.52
Sub-Saharan Africa,Eritrea,Cereal,Online,H,679414975,2844,205.7,117.11,585010.8,333060.84,251949.96
Europe,Montenegro,Clothes,Offline,M,208630645,7299,109.28,35.84,797634.72,261596.16,536038.56
Central America and the Caribbean,Jamaica,Vegetables,Online,H,266467225,2428,154.06,90.93,374057.68,220778.04,153279.64
Australia and Oceania,Fiji,Vegetables,Offline,H,118598544,4800,154.06,90.93,739488.0,436464.0,303024.0


In [0]:

#CREATE DATABASE sample
#df.write.mode("overwrite").saveAsTable("OrderTable18Dec")
dbutils.fs("ls")

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
[0;32m<command-4135622570817961>[0m in [0;36m<module>[0;34m[0m
[1;32m      1[0m [0;31m#CREATE DATABASE sample[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[1;32m      2[0m [0;31m#df.write.mode("overwrite").saveAsTable("OrderTable18Dec")[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 3[0;31m [0mdbutils[0m[0;34m.[0m[0mfs[0m[0;34m([0m[0;34m"ls"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;31mTypeError[0m: __call__() takes 1 positional argument but 2 were given

In [0]:
#df.write.mode("overwrite").saveAsTable("OrdeYabler")
# dataframe.write.mode("overwrite").option("path","<your-storage-path>").saveAsTable("<example-table>")   Unmanaged Overwrite
df.write.mode("overwrite").partitionBy("Country").saveAsTable("OrderPartition18Dec")

In [0]:
%fs
rm /user/hive/warehouse/ordeyabler/part-00000-dd01437f-1f50-4cc6-91dd-f7b1b194476e-c000.snappy.parquet

In [0]:
%sql
--REFRESH TABLE OrdeYabler
--select * from OrdeYabler
--Drop table OrdeYabler
--CRUD
-- SELECT * FROM OrderPartition18Dec WHERE OrderID=686800706
-- DESCRIBE history OrderPartition18Dec

SELECT * FROM OrderPartition18Dec VERSION AS OF 1 WHERE OrderID=686800706
UPDATE  VERSION AS OF 0 SET Country = 'Libya' WHERE  OrderID=686800706


 --SELECT * FROM order_delta WHERE OrderID=686800706
 -- DELETE FROM OrderPartition18Dec WHERE OrderID=686800706
-- Confirm the user's data was deleted
/*SELECT * FROM order_delta WHERE OrderID=686800706

INSERT INTO order_delta
SELECT * FROM order_delta VERSION AS OF 0
WHERE OrderID=686800706

UPDATE  VERSION AS OF 0 SET Country = 'Libya' WHERE  OrderID=686800706


-- Vacuum deletes all files no longer needed by the current version of the table.
VACUUM  VERSION AS OF 0

CACHE SELECT * FROM  VERSION AS OF 0
OPTIMIZE  VERSION AS OF 0 ZORDER BY Country


Region,Country,ItemType,SalesChannel,OrderPriority,OrderID,UnitsSold,UnitPrice,UnitCost,TotalRevenue,TotalCost,TotalProfit


In [0]:
df.write.format("delta").mode("overwrite").saveAsTable("order_delta5")


In [0]:
%sql

select * from order_delta5;

Region,Country,ItemType,SalesChannel,OrderPriority,OrderID,UnitsSold,UnitPrice,UnitCost,TotalRevenue,TotalCost,TotalProfit
Middle East and North Africa,Libya,Cosmetics,Offline,M,686800706,8446,437.2,263.33,3692591.2,2224085.18,1468506.02
North America,Canada,Vegetables,Online,M,185941302,3018,154.06,90.93,464953.08,274426.74,190526.34
Middle East and North Africa,Libya,Baby Food,Offline,C,246222341,1517,255.28,159.42,387259.76,241840.14,145419.62
Asia,Japan,Cereal,Offline,C,161442649,3322,205.7,117.11,683335.4,389039.42,294295.98
Sub-Saharan Africa,Chad,Fruits,Offline,H,645713555,9845,9.33,6.92,91853.85,68127.4,23726.45
Europe,Armenia,Cereal,Online,H,683458888,9528,205.7,117.11,1959909.6,1115824.08,844085.52
Sub-Saharan Africa,Eritrea,Cereal,Online,H,679414975,2844,205.7,117.11,585010.8,333060.84,251949.96
Europe,Montenegro,Clothes,Offline,M,208630645,7299,109.28,35.84,797634.72,261596.16,536038.56
Central America and the Caribbean,Jamaica,Vegetables,Online,H,266467225,2428,154.06,90.93,374057.68,220778.04,153279.64
Australia and Oceania,Fiji,Vegetables,Offline,H,118598544,4800,154.06,90.93,739488.0,436464.0,303024.0


In [0]:
%sql
CREATE TABLE loans_delta2
USING delta
AS SELECT * FROM parquet.'/tmp/delta_demo/loans_parquet'

In [0]:
%sql 
-- Use CONVERT TO DELTA to convert Parquet files to Delta Lake format in place
CONVERT TO DELTA parquet. /tmp/delta_demo/loans_parquet`

In [0]:
%sql 
DESCRIBE HISTORY order_delta
--View the Delta Lake transaction log

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata
0,2021-12-16T02:15:36.000+0000,4886020049249683,azurelibblog@gmail.com,CREATE OR REPLACE TABLE AS SELECT,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {})",,List(4135622570817943),1216-010453-8kabofbv,,WriteSerializable,False,"Map(numFiles -> 1, numOutputBytes -> 36124, numOutputRows -> 1000)",


In [0]:
#Use Schema Evolution to add new columns to schema
new_data.write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable("loans_delta")

In [0]:
#verison
#DESCRIBE HISTORY loans_delta
spark.sql("SELECT * FROM order_delta VERSION AS OF 0").show(3)
#spark.sql("SELECT COUNT(*) FROM loans_delta VERSION AS OF 0").show()

#Rollback
%sql 
#RESTORE loans_delta VERSION AS OF 0

+--------------------+-------+----------+------------+-------------+---------+---------+---------+--------+------------+----------+-----------+
|              Region|Country|  ItemType|SalesChannel|OrderPriority|  OrderID|UnitsSold|UnitPrice|UnitCost|TotalRevenue| TotalCost|TotalProfit|
+--------------------+-------+----------+------------+-------------+---------+---------+---------+--------+------------+----------+-----------+
|Middle East and N...|  Libya| Cosmetics|     Offline|            M|686800706|     8446|    437.2|  263.33|   3692591.2|2224085.18| 1468506.02|
|       North America| Canada|Vegetables|      Online|            M|185941302|     3018|   154.06|   90.93|   464953.08| 274426.74|  190526.34|
|Middle East and N...|  Libya| Baby Food|     Offline|            C|246222341|     1517|   255.28|  159.42|   387259.76| 241840.14|  145419.62|
+--------------------+-------+----------+------------+-------------+---------+---------+---------+--------+------------+----------+-----



In [0]:
%sql
--CRUD

-- describe history order_delta
SELECT * FROM order_delta VERSION AS OF 1  WHERE OrderID=686800706

Region,Country,ItemType,SalesChannel,OrderPriority,OrderID,UnitsSold,UnitPrice,UnitCost,TotalRevenue,TotalCost,TotalProfit
Middle East and North Africa,Libya,Cosmetics,Offline,M,686800706,8446,437.2,263.33,3692591.2,2224085.18,1468506.02
Middle East and North Africa,Libya,Cosmetics,Offline,M,686800706,8446,437.2,263.33,3692591.2,2224085.18,1468506.02


In [0]:
%sql
--SELECT * FROM order_delta5 WHERE OrderID=686800706
--DELETE FROM order_delta5 WHERE OrderID=686800706
SELECT * FROM order_delta5 VERSION AS OF 3 WHERE OrderID=686800706


Region,Country,ItemType,SalesChannel,OrderPriority,OrderID,UnitsSold,UnitPrice,UnitCost,TotalRevenue,TotalCost,TotalProfit
Middle East and North Africa,Libya,Cosmetics,Offline,M,686800706,8446,437.2,263.33,3692591.2,2224085.18,1468506.02


In [0]:

%sql
--CRUD
SELECT * FROM order_delta WHERE OrderID=686800706
DELETE FROM order_delta WHERE OrderID=6868007060
-- Confirm the user's data was deleted
SELECT * FROM order_delta WHERE OrderID=686800706

INSERT INTO order_delta
SELECT * FROM order_delta VERSION AS OF 0
WHERE OrderID=686800706

UPDATE  VERSION AS OF 0 SET Country = 'Libya' WHERE  OrderID=686800706


-- Vacuum deletes all files no longer needed by the current version of the table.
VACUUM  VERSION AS OF 0

CACHE SELECT * FROM  VERSION AS OF 0
OPTIMIZE  VERSION AS OF 0 ZORDER BY Country

In [0]:
#Create secret in the key vault to store the storage account key
# Got to https://community.cloud.databricks.com/?o=9003695902647151#secrets/createScope 
# provide the key vault URL and resource ID from the KeyValut Properties and give scope name of your choice
# azurelibstorageaccountky  = key vault secet name
#


dbutils.fs.mount(
  source = "wasbs://myfirstcontainer@azureifystorageaccount.blob.core.windows.net",
  mount_point = "/mnt",
  extra_configs = {"fs.azure.account.key.azureifystorageaccount.blob.core.windows.net":dbutils.secrets.get(scope = "azurelibscope", key = "azurelibstorageaccountky")})
