In [1]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext.getOrCreate()
# initializing spark session
spark = SparkSession(sc) 

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/08/07 12:50:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/08/07 12:50:44 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
# create a DF from a CSV file
creditcardDF = spark.read.format("csv") \
                         .option("header", "true") \
                         .option("inferSchema", "true") \
                         .load("SalesTraining.csv")

# display the schema
creditcardDF.printSchema()

root
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Item Type: string (nullable = true)
 |-- Sales Channel: string (nullable = true)
 |-- Order Priority: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Order ID: integer (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Units Sold: integer (nullable = true)
 |-- Unit Price: double (nullable = true)
 |-- Unit Cost: double (nullable = true)
 |-- Total Revenue: double (nullable = true)
 |-- Total Cost: double (nullable = true)
 |-- Total Profit: double (nullable = true)



In [3]:
print(creditcardDF)

DataFrame[Region: string, Country: string, Item Type: string, Sales Channel: string, Order Priority: string, Order Date: string, Order ID: int, Ship Date: string, Units Sold: int, Unit Price: double, Unit Cost: double, Total Revenue: double, Total Cost: double, Total Profit: double]


In [4]:
# display first 5 records with no truncation
creditcardDF.show(5, False)

+------------------+----------+----------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|Region            |Country   |Item Type |Sales Channel|Order Priority|Order Date|Order ID |Ship Date |Units Sold|Unit Price|Unit Cost|Total Revenue|Total Cost|Total Profit|
+------------------+----------+----------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|Sub-Saharan Africa|Burundi   |Vegetables|Online       |M             |11/17/2010|951380240|12/20/2010|3410      |154.06    |90.93    |525344.6     |310071.3  |215273.3    |
|Europe            |Ukraine   |Cosmetics |Online       |M             |11/13/2014|270001733|1/1/2015  |8368      |437.2     |263.33   |3658489.6    |2203545.44|1454944.16  |
|Europe            |Croatia   |Beverages |Online       |C             |6/16/2016 |681941401|7/28/2016 |470       |47.45     |31.79

In [5]:
# defining your own schema
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, FloatType, LongType, IntegerType, DateType

# define the structure
schema = StructType([
    StructField("Region", StringType()),
    StructField("Country", StringType()),
    StructField("Item", StringType()),
    StructField("SalesChannel", StringType()),
    StructField("OrderPriority", StringType()),
    StructField("OrderDate", StringType()),
    StructField("OrderID", LongType()),
    StructField("ShipDate", StringType()),
    StructField("UnitsSold", IntegerType()),
    StructField("UnitPrice", FloatType()),
    StructField("UnitCost", FloatType()),
    StructField("TotalRevenue", DoubleType()),
    StructField("TotalCost", DoubleType()),
    StructField("TotalProfit", DoubleType())
])

# read the file by using the defined schema
creditcardDF1 = spark.read.format("csv").option("header", "true").schema(schema).load("SalesTraining.csv")

# display the schema
creditcardDF1.printSchema()

root
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Item: string (nullable = true)
 |-- SalesChannel: string (nullable = true)
 |-- OrderPriority: string (nullable = true)
 |-- OrderDate: string (nullable = true)
 |-- OrderID: long (nullable = true)
 |-- ShipDate: string (nullable = true)
 |-- UnitsSold: integer (nullable = true)
 |-- UnitPrice: float (nullable = true)
 |-- UnitCost: float (nullable = true)
 |-- TotalRevenue: double (nullable = true)
 |-- TotalCost: double (nullable = true)
 |-- TotalProfit: double (nullable = true)



In [6]:
# display the records
creditcardDF1.show(3, False)

22/08/07 12:50:48 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Region, Country, Item Type, Sales Channel, Order Priority, Order Date, Order ID, Ship Date, Units Sold, Unit Price, Unit Cost, Total Revenue, Total Cost, Total Profit
 Schema: Region, Country, Item, SalesChannel, OrderPriority, OrderDate, OrderID, ShipDate, UnitsSold, UnitPrice, UnitCost, TotalRevenue, TotalCost, TotalProfit
Expected: Item but found: Item Type
CSV file: file:///Users/kaustuvkunal/learning/BDA/Big%20Data%20Final%20Content/Session%205_Spark%20_%20SparkSQL/Session%205_Spark%20_%20SparkSQL/Faculty%20Notebook/SalesTraining.csv
+------------------+-------+----------+------------+-------------+----------+---------+----------+---------+---------+--------+------------+----------+-----------+
|Region            |Country|Item      |SalesChannel|OrderPriority|OrderDate |OrderID  |ShipDate  |UnitsSold|UnitPrice|UnitCost|TotalRevenue|TotalCost |TotalProfit|
+------------------+-------+------

In [7]:
# selecting only a few columns
from pyspark.sql.functions import col, column
import pyspark.sql.functions as F

# select a few columns
creditcardDF.select("Region", "Country", F.col("Item Type"), "Order Date", "Units Sold", "Total Revenue", F.lit('DefaultValue')).show(4, False)

+------------------+----------+----------+----------+----------+-------------+------------+
|Region            |Country   |Item Type |Order Date|Units Sold|Total Revenue|DefaultValue|
+------------------+----------+----------+----------+----------+-------------+------------+
|Sub-Saharan Africa|Burundi   |Vegetables|11/17/2010|3410      |525344.6     |DefaultValue|
|Europe            |Ukraine   |Cosmetics |11/13/2014|8368      |3658489.6    |DefaultValue|
|Europe            |Croatia   |Beverages |6/16/2016 |470       |22301.5      |DefaultValue|
|Sub-Saharan Africa|Madagascar|Fruits    |5/31/2016 |7690      |71747.7      |DefaultValue|
+------------------+----------+----------+----------+----------+-------------+------------+
only showing top 4 rows



In [8]:
# selecting all the columns
creditcardDF.select("*").show(4, False)

+------------------+----------+----------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|Region            |Country   |Item Type |Sales Channel|Order Priority|Order Date|Order ID |Ship Date |Units Sold|Unit Price|Unit Cost|Total Revenue|Total Cost|Total Profit|
+------------------+----------+----------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|Sub-Saharan Africa|Burundi   |Vegetables|Online       |M             |11/17/2010|951380240|12/20/2010|3410      |154.06    |90.93    |525344.6     |310071.3  |215273.3    |
|Europe            |Ukraine   |Cosmetics |Online       |M             |11/13/2014|270001733|1/1/2015  |8368      |437.2     |263.33   |3658489.6    |2203545.44|1454944.16  |
|Europe            |Croatia   |Beverages |Online       |C             |6/16/2016 |681941401|7/28/2016 |470       |47.45     |31.79

In [9]:
# renaming a column
creditcardDF.columns

['Region',
 'Country',
 'Item Type',
 'Sales Channel',
 'Order Priority',
 'Order Date',
 'Order ID',
 'Ship Date',
 'Units Sold',
 'Unit Price',
 'Unit Cost',
 'Total Revenue',
 'Total Cost',
 'Total Profit']

In [10]:
# renaming the columns - "Sales Channel" to "SalesChannel" and "Item Type" to "Item"
creditcardDF2 = creditcardDF.withColumnRenamed("Sales Channel", "SalesChannel").withColumnRenamed('Item Type','Item')

In [11]:
# display records
creditcardDF2.show(3, False)

+------------------+-------+----------+------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|Region            |Country|Item      |SalesChannel|Order Priority|Order Date|Order ID |Ship Date |Units Sold|Unit Price|Unit Cost|Total Revenue|Total Cost|Total Profit|
+------------------+-------+----------+------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|Sub-Saharan Africa|Burundi|Vegetables|Online      |M             |11/17/2010|951380240|12/20/2010|3410      |154.06    |90.93    |525344.6     |310071.3  |215273.3    |
|Europe            |Ukraine|Cosmetics |Online      |M             |11/13/2014|270001733|1/1/2015  |8368      |437.2     |263.33   |3658489.6    |2203545.44|1454944.16  |
|Europe            |Croatia|Beverages |Online      |C             |6/16/2016 |681941401|7/28/2016 |470       |47.45     |31.79    |22301.5      |14941

In [12]:
# change column data type
from pyspark.sql.functions import col

# change the data type from integer to long
df = creditcardDF.withColumn("Order ID", col("Order ID").cast("long"))
df.printSchema() # display the schema

root
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Item Type: string (nullable = true)
 |-- Sales Channel: string (nullable = true)
 |-- Order Priority: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Order ID: long (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Units Sold: integer (nullable = true)
 |-- Unit Price: double (nullable = true)
 |-- Unit Cost: double (nullable = true)
 |-- Total Revenue: double (nullable = true)
 |-- Total Cost: double (nullable = true)
 |-- Total Profit: double (nullable = true)



In [13]:
creditcardDF.show(4, False)

+------------------+----------+----------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|Region            |Country   |Item Type |Sales Channel|Order Priority|Order Date|Order ID |Ship Date |Units Sold|Unit Price|Unit Cost|Total Revenue|Total Cost|Total Profit|
+------------------+----------+----------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|Sub-Saharan Africa|Burundi   |Vegetables|Online       |M             |11/17/2010|951380240|12/20/2010|3410      |154.06    |90.93    |525344.6     |310071.3  |215273.3    |
|Europe            |Ukraine   |Cosmetics |Online       |M             |11/13/2014|270001733|1/1/2015  |8368      |437.2     |263.33   |3658489.6    |2203545.44|1454944.16  |
|Europe            |Croatia   |Beverages |Online       |C             |6/16/2016 |681941401|7/28/2016 |470       |47.45     |31.79

In [14]:
# adding columns to a dataframe
import pyspark.sql.functions as F

# add a new column "Register_Site" with default value "www.google.com"
dataDF = creditcardDF.withColumn("Register_Site", F.lit("www.google.com"))

# display only a few columns
dataDF.select("Region","Country", "Item Type", "Register_Site").show(3, False)

+------------------+-------+----------+--------------+
|Region            |Country|Item Type |Register_Site |
+------------------+-------+----------+--------------+
|Sub-Saharan Africa|Burundi|Vegetables|www.google.com|
|Europe            |Ukraine|Cosmetics |www.google.com|
|Europe            |Croatia|Beverages |www.google.com|
+------------------+-------+----------+--------------+
only showing top 3 rows



In [15]:
# removing columns from a DataFrame

# number of columns in a dataframe - before removing columns
print("Number of columns : ", len(dataDF.columns))

# columns - before dropping
print(list(dataDF.columns))

# drop columns - "Country", "Item Type"
datanewDF = dataDF.drop("Country", "Item Type")

# number of columns in a dataframe - after removing columns
print("Number of columns : ", len(datanewDF.columns))

# columns - after dropping
print(list(datanewDF.columns))

Number of columns :  15
['Region', 'Country', 'Item Type', 'Sales Channel', 'Order Priority', 'Order Date', 'Order ID', 'Ship Date', 'Units Sold', 'Unit Price', 'Unit Cost', 'Total Revenue', 'Total Cost', 'Total Profit', 'Register_Site']
Number of columns :  13
['Region', 'Sales Channel', 'Order Priority', 'Order Date', 'Order ID', 'Ship Date', 'Units Sold', 'Unit Price', 'Unit Cost', 'Total Revenue', 'Total Cost', 'Total Profit', 'Register_Site']


In [16]:
# arithmetic with dataframes
# number of columns in a dataframe - before a adding a column
print("Number of columns : ", len(creditcardDF.columns))

# perform arithmetic operations on a dataframe column
creditnewDF = creditcardDF.withColumn("TotalSale", col("Units Sold") * col("Unit Price"))

# number of columns in a dataframe - after adding columns
print("Number of columns : ", len(creditnewDF.columns))

# display records
creditnewDF.show(3)

Number of columns :  14
Number of columns :  15
+------------------+-------+----------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+---------+
|            Region|Country| Item Type|Sales Channel|Order Priority|Order Date| Order ID| Ship Date|Units Sold|Unit Price|Unit Cost|Total Revenue|Total Cost|Total Profit|TotalSale|
+------------------+-------+----------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+---------+
|Sub-Saharan Africa|Burundi|Vegetables|       Online|             M|11/17/2010|951380240|12/20/2010|      3410|    154.06|    90.93|     525344.6|  310071.3|    215273.3| 525344.6|
|            Europe|Ukraine| Cosmetics|       Online|             M|11/13/2014|270001733|  1/1/2015|      8368|     437.2|   263.33|    3658489.6|2203545.44|  1454944.16|3658489.6|
|            Europe|Croatia| Beverages|       O

In [17]:
# filter a dataframe

creditcardDF.where(col("Region") == "Asia").show(5)

+------+-----------+---------------+-------------+--------------+----------+---------+---------+----------+----------+---------+-------------+----------+------------+
|Region|    Country|      Item Type|Sales Channel|Order Priority|Order Date| Order ID|Ship Date|Units Sold|Unit Price|Unit Cost|Total Revenue|Total Cost|Total Profit|
+------+-----------+---------------+-------------+--------------+----------+---------+---------+----------+----------+---------+-------------+----------+------------+
|  Asia|   Malaysia|         Snacks|      Offline|             M| 10/6/2012|175033080|11/5/2012|      5033|    152.58|    97.44|    767935.14| 490415.52|   277519.62|
|  Asia| Uzbekistan|Office Supplies|      Offline|             L| 3/10/2012|276595246|3/15/2012|      9535|    651.21|   524.96|   6209287.35| 5005493.6|  1203793.75|
|  Asia|      Nepal|     Vegetables|      Offline|             C|  6/2/2014|443121373|6/19/2014|      8316|    154.06|    90.93|   1281162.96| 756173.88|   524989.08

In [18]:
# filter a dataframe - multiple columns

creditcardDF.where((col("Region") == "Asia") & (col("Sales Channel") == "Online")).show(5)

+------+-----------+---------+-------------+--------------+----------+---------+---------+----------+----------+---------+-------------+----------+------------+
|Region|    Country|Item Type|Sales Channel|Order Priority|Order Date| Order ID|Ship Date|Units Sold|Unit Price|Unit Cost|Total Revenue|Total Cost|Total Profit|
+------+-----------+---------+-------------+--------------+----------+---------+---------+----------+----------+---------+-------------+----------+------------+
|  Asia|      India|   Fruits|       Online|             H| 7/29/2010|658348691|8/22/2010|      8862|      9.33|     6.92|     82682.46|  61325.04|    21357.42|
|  Asia|Philippines|Baby Food|       Online|             L| 2/23/2014|160127294|3/23/2014|      4079|    255.28|   159.42|   1041287.12| 650274.18|   391012.94|
|  Asia| Bangladesh|Baby Food|       Online|             H| 8/17/2011|254927718| 9/7/2011|      7632|    255.28|   159.42|   1948296.96|1216693.44|   731603.52|
|  Asia|    Vietnam|Baby Food|    

In [19]:
# dropping rows
testDF = [[1, "January"], [2, "February"], [1, "January"], [3, "March"], [3, "March"], [3, "March"], [4, "April"], [4, "April"], [5, "May"], [5, "May"],
          [4, "April"], [6, "June"], [5, "April"]]

# import the modules
from pyspark.sql.types import *

# define the schema
schema = StructType([StructField("ID", IntegerType()),StructField("Month", StringType())])

# create the dataframe by applying schema
df = spark.createDataFrame(testDF,schema=schema) 

# display the records
df.show()

+---+--------+
| ID|   Month|
+---+--------+
|  1| January|
|  2|February|
|  1| January|
|  3|   March|
|  3|   March|
|  3|   March|
|  4|   April|
|  4|   April|
|  5|     May|
|  5|     May|
|  4|   April|
|  6|    June|
|  5|   April|
+---+--------+



[Stage 12:>                                                         (0 + 1) / 1]                                                                                

In [20]:
# display distinct rows
df.distinct().show()

+---+--------+
| ID|   Month|
+---+--------+
|  1| January|
|  2|February|
|  3|   March|
|  4|   April|
|  5|     May|
|  5|   April|
|  6|    June|
+---+--------+



In [21]:
# drop duplicate records based a column value
df.dropDuplicates(['Month']).show()

# drop duplicate records based multiple column values
df.dropDuplicates(['Month', 'ID']).show()

+---+--------+
| ID|   Month|
+---+--------+
|  1| January|
|  2|February|
|  3|   March|
|  4|   April|
|  5|     May|
|  6|    June|
+---+--------+

+---+--------+
| ID|   Month|
+---+--------+
|  1| January|
|  2|February|
|  3|   March|
|  4|   April|
|  5|     May|
|  6|    June|
|  5|   April|
+---+--------+



In [22]:
# rename existing columns
newDF = creditcardDF.withColumnRenamed("Unit Price", "UnitPrice").withColumnRenamed("Total Profit", "Total_Profit")

creditcardDF.show(3) # display records

from pyspark.sql.functions import expr # define the modules

# using select expression 
newDF.select("Region", "Country",expr("CASE WHEN Total_Profit > 300000 THEN  'Good' ELSE 'Average' END AS value_desc")).show(3)

+------------------+-------+----------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|            Region|Country| Item Type|Sales Channel|Order Priority|Order Date| Order ID| Ship Date|Units Sold|Unit Price|Unit Cost|Total Revenue|Total Cost|Total Profit|
+------------------+-------+----------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|Sub-Saharan Africa|Burundi|Vegetables|       Online|             M|11/17/2010|951380240|12/20/2010|      3410|    154.06|    90.93|     525344.6|  310071.3|    215273.3|
|            Europe|Ukraine| Cosmetics|       Online|             M|11/13/2014|270001733|  1/1/2015|      8368|     437.2|   263.33|    3658489.6|2203545.44|  1454944.16|
|            Europe|Croatia| Beverages|       Online|             C| 6/16/2016|681941401| 7/28/2016|       470|     47.45|    31.79|      22301.5

In [23]:
from pyspark.sql.types import *   # import the libraries

# define a list
list_data = [["Bill Gates",23],["Henry Ford", None], ["Tim Cook", None]]

# define the schema
schema = StructType([StructField("Name", StringType()),StructField("Experience", IntegerType())])

# create a dataframe 
df = spark.createDataFrame(list_data,schema=schema)

df.show() # display the dataframe

+----------+----------+
|      Name|Experience|
+----------+----------+
|Bill Gates|        23|
|Henry Ford|      null|
|  Tim Cook|      null|
+----------+----------+



In [24]:
# drop null value rows
df.na.drop().show()

+----------+----------+
|      Name|Experience|
+----------+----------+
|Bill Gates|        23|
+----------+----------+



In [25]:
# fill null value with a constant value
df.fillna(34).show()

+----------+----------+
|      Name|Experience|
+----------+----------+
|Bill Gates|        23|
|Henry Ford|        34|
|  Tim Cook|        34|
+----------+----------+



In [26]:
# replace a single value
df.na.replace('Bill Gates', 'Satya Nadella').show()

+-------------+----------+
|         Name|Experience|
+-------------+----------+
|Satya Nadella|        23|
|   Henry Ford|      null|
|     Tim Cook|      null|
+-------------+----------+



In [27]:
# replace multiple values and also fill 'null' with a constant value
df.na.replace(['Bill Gates', 'Tim Cook'], ['Satya N', 'Time'], 'Name').fillna(40).show()

+----------+----------+
|      Name|Experience|
+----------+----------+
|   Satya N|        23|
|Henry Ford|        40|
|      Time|        40|
+----------+----------+



In [28]:
# rename the existing columns - "Item Type" to "ItemType" and "Total Profit" to "Total_Profit"
newDF = creditcardDF.withColumnRenamed("Item Type", "ItemType").withColumnRenamed("Total Profit", "Total_Profit")

# find maximum total_profit for each region and alias the column to "Maximum"
newDF.groupBy("Region").max("Total_Profit").alias("Maximum").show(10, False)

+---------------------------------+-----------------+
|Region                           |max(Total_Profit)|
+---------------------------------+-----------------+
|Middle East and North Africa     |1682887.73       |
|Australia and Oceania            |1631943.31       |
|Europe                           |1726181.36       |
|Sub-Saharan Africa               |1571089.32       |
|Central America and the Caribbean|1631422.21       |
|North America                    |1541620.46       |
|Asia                             |1725485.88       |
+---------------------------------+-----------------+



In [29]:
# count of items in each region
newDF.groupBy("Region").agg({'ItemType':'count'}).show(10, False)

+---------------------------------+---------------+
|Region                           |count(ItemType)|
+---------------------------------+---------------+
|Middle East and North Africa     |132            |
|Australia and Oceania            |76             |
|Europe                           |255            |
|Sub-Saharan Africa               |247            |
|Central America and the Caribbean|93             |
|North America                    |16             |
|Asia                             |130            |
+---------------------------------+---------------+



In [30]:
from pyspark.sql.functions import avg # include the library

# find average of column - "Total_Profit" 
newDF.select(avg("Total_Profit").alias("Average Profit")).show()

+-----------------+
|   Average Profit|
+-----------------+
|392951.5061011591|
+-----------------+



In [31]:
# include the library
from pyspark.sql.functions import col

# order the records by region - ascending
creditcardDF.orderBy('Region', ascending=True).select("Region","Country", "Item Type", "Order ID", "Total Profit").show(3)

+------+-----------+---------------+---------+------------+
|Region|    Country|      Item Type| Order ID|Total Profit|
+------+-----------+---------------+---------+------------+
|  Asia| Uzbekistan|Office Supplies|276595246|  1203793.75|
|  Asia|South Korea|         Fruits|769205892|     9572.52|
|  Asia|   Malaysia|         Snacks|175033080|   277519.62|
+------+-----------+---------------+---------+------------+
only showing top 3 rows



In [32]:
# include the library
from pyspark.sql.functions import col

# order the records by region - descending
creditcardDF.orderBy('Region', ascending=False).select("Region","Country", "Item Type", "Order ID", "Total Profit").show(3)

+------------------+--------------------+---------------+---------+------------+
|            Region|             Country|      Item Type| Order ID|Total Profit|
+------------------+--------------------+---------------+---------+------------+
|Sub-Saharan Africa|            Botswana|        Clothes|680517470|   668083.68|
|Sub-Saharan Africa|Central African R...|Office Supplies|668599021|   273078.75|
|Sub-Saharan Africa|            Tanzania|  Personal Care|400304734|   198500.26|
+------------------+--------------------+---------------+---------+------------+
only showing top 3 rows



In [33]:
# cache and persist
from pyspark import StorageLevel

# cache the dataframe in in-memory
cacheDF = creditcardDF.cache()

# read the records from cache
cacheDF.select("Region", "Country", "Item Type", "Sales Channel", "Order Priority", \
               "Order Date", "Order ID", "Ship Date").show(4, truncate=False)

+------------------+----------+----------+-------------+--------------+----------+---------+----------+
|Region            |Country   |Item Type |Sales Channel|Order Priority|Order Date|Order ID |Ship Date |
+------------------+----------+----------+-------------+--------------+----------+---------+----------+
|Sub-Saharan Africa|Burundi   |Vegetables|Online       |M             |11/17/2010|951380240|12/20/2010|
|Europe            |Ukraine   |Cosmetics |Online       |M             |11/13/2014|270001733|1/1/2015  |
|Europe            |Croatia   |Beverages |Online       |C             |6/16/2016 |681941401|7/28/2016 |
|Sub-Saharan Africa|Madagascar|Fruits    |Online       |L             |5/31/2016 |566935575|6/7/2016  |
+------------------+----------+----------+-------------+--------------+----------+---------+----------+
only showing top 4 rows



In [34]:
# cache and persist
from pyspark import StorageLevel

# persist the dataframe in both memo
persistDF = creditcardDF.persist(StorageLevel.MEMORY_AND_DISK)

# read the records from saved dataframe
persistDF.select("Region", "Country", "Item Type", "Sales Channel", "Order Priority", \
               "Order Date", "Order ID").show(4, truncate=False)

22/08/07 12:50:54 WARN CacheManager: Asked to cache already cached data.
+------------------+----------+----------+-------------+--------------+----------+---------+
|Region            |Country   |Item Type |Sales Channel|Order Priority|Order Date|Order ID |
+------------------+----------+----------+-------------+--------------+----------+---------+
|Sub-Saharan Africa|Burundi   |Vegetables|Online       |M             |11/17/2010|951380240|
|Europe            |Ukraine   |Cosmetics |Online       |M             |11/13/2014|270001733|
|Europe            |Croatia   |Beverages |Online       |C             |6/16/2016 |681941401|
|Sub-Saharan Africa|Madagascar|Fruits    |Online       |L             |5/31/2016 |566935575|
+------------------+----------+----------+-------------+--------------+----------+---------+
only showing top 4 rows



In [35]:
# coalesce vs repartition
print("Number of partitions : ", creditcardDF.rdd.getNumPartitions())

# increase the number of partitions
cDF = creditcardDF.repartition(2)

# number of partitions after repatitioning
print("Number of partitions : ", cDF.rdd.getNumPartitions())

# reduce the number of partitions
cDF = cDF.coalesce(1)

# number of partitions after coalesce
print("Number of partitions : ", cDF.rdd.getNumPartitions())

Number of partitions :  1
Number of partitions :  2
Number of partitions :  1


In [36]:
# aggregates the Item Type count by region, brings the data to a single partition
writeDF = newDF.groupBy("Region").agg({'ItemType':'count'}).coalesce(1)  

# write to DBFS - mode: "overwrite" replaces the existing file and "append" adds the content
writeDF.write.option("header","true").option("sep",",").mode("overwrite").csv("Aggregate/")

In [37]:
# %fs ls "Aggregate/"

In [39]:
# read the csv file
# note : the name of part file has to be manually added 
newDF = spark.read.format("csv").option("header", "true").option("inferSchema", "true") \
   .load("Aggregate/part-00000-2917f169-8e91-460d-9355-4027cb1cf460-c000.csv")

# display the records
newDF.show(10, False)

+---------------------------------+---------------+
|Region                           |count(ItemType)|
+---------------------------------+---------------+
|Middle East and North Africa     |132            |
|Australia and Oceania            |76             |
|Europe                           |255            |
|Sub-Saharan Africa               |247            |
|Central America and the Caribbean|93             |
|North America                    |16             |
|Asia                             |130            |
+---------------------------------+---------------+



In [40]:
# spark SQL
# create a DataFrame
from pyspark.sql.types import *   # import the library
leader_data = [["Middle East and North Africa","Mohammed Saif"],["Australia and Oceania", "George Carlin"], \
               ["Europe", "Stuart Broad"], ["Sub-Saharan Africa", "Abdalla"], ["Central America and the Caribbean", "Chris Gayle"], \
               ["North America", "George Bush"], ["Asia", "Tatyaso Martin"]]

# define the schema
schema = StructType([StructField("Region", StringType()), StructField("SalesPerson", StringType())])

# create a dataframe and display the records
df = spark.createDataFrame(leader_data,schema=schema)
df.show(10, False)

+---------------------------------+--------------+
|Region                           |SalesPerson   |
+---------------------------------+--------------+
|Middle East and North Africa     |Mohammed Saif |
|Australia and Oceania            |George Carlin |
|Europe                           |Stuart Broad  |
|Sub-Saharan Africa               |Abdalla       |
|Central America and the Caribbean|Chris Gayle   |
|North America                    |George Bush   |
|Asia                             |Tatyaso Martin|
+---------------------------------+--------------+



In [42]:
df.createOrReplaceTempView("sales_table")  # convert dataframe to view
spark.sql("select * from sales_table").show(10, False)

+---------------------------------+--------------+
|Region                           |SalesPerson   |
+---------------------------------+--------------+
|Middle East and North Africa     |Mohammed Saif |
|Australia and Oceania            |George Carlin |
|Europe                           |Stuart Broad  |
|Sub-Saharan Africa               |Abdalla       |
|Central America and the Caribbean|Chris Gayle   |
|North America                    |George Bush   |
|Asia                             |Tatyaso Martin|
+---------------------------------+--------------+



In [43]:
spark.sql("select * from sales_table where Region = 'Asia'").show(10, False)

+------+--------------+
|Region|SalesPerson   |
+------+--------------+
|Asia  |Tatyaso Martin|
+------+--------------+



In [44]:
spark.sql("select * from sales_table where SalesPerson like '%George%'").show(10, False)

+---------------------+-------------+
|Region               |SalesPerson  |
+---------------------+-------------+
|Australia and Oceania|George Carlin|
|North America        |George Bush  |
+---------------------+-------------+



In [45]:
spark.sql("select count(*) from sales_table").show()

+--------+
|count(1)|
+--------+
|       7|
+--------+



In [46]:
creditcardDF.createOrReplaceTempView("creditcard")

spark.sql("select * from creditcard").show(1, False)

+------------------+-------+----------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|Region            |Country|Item Type |Sales Channel|Order Priority|Order Date|Order ID |Ship Date |Units Sold|Unit Price|Unit Cost|Total Revenue|Total Cost|Total Profit|
+------------------+-------+----------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|Sub-Saharan Africa|Burundi|Vegetables|Online       |M             |11/17/2010|951380240|12/20/2010|3410      |154.06    |90.93    |525344.6     |310071.3  |215273.3    |
+------------------+-------+----------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
only showing top 1 row



In [47]:
# renaming a column using DSL
newDF = creditcardDF.withColumnRenamed("Total Revenue", "TotalRevenue")

# create a temp view

newDF.createOrReplaceTempView("creditcard")

# apply aggregations on the table data
spark.sql("select Region, max(TotalRevenue) from creditcard group by Region").show(truncate=False)

+---------------------------------+-----------------+
|Region                           |max(TotalRevenue)|
+---------------------------------+-----------------+
|Middle East and North Africa     |6160781.13       |
|Australia and Oceania            |6580454.69       |
|Europe                           |6617209.54       |
|Sub-Saharan Africa               |6263026.44       |
|Central America and the Caribbean|6354579.43       |
|North America                    |6216247.54       |
|Asia                             |6557065.24       |
+---------------------------------+-----------------+



In [48]:
spark.sql("select Region, max(TotalRevenue) from creditcard group by Region order by Region").show(truncate=False)

+---------------------------------+-----------------+
|Region                           |max(TotalRevenue)|
+---------------------------------+-----------------+
|Asia                             |6557065.24       |
|Australia and Oceania            |6580454.69       |
|Central America and the Caribbean|6354579.43       |
|Europe                           |6617209.54       |
|Middle East and North Africa     |6160781.13       |
|North America                    |6216247.54       |
|Sub-Saharan Africa               |6263026.44       |
+---------------------------------+-----------------+



In [49]:
spark.sql("select Region, max(TotalRevenue) from creditcard group by Region order by Region desc").show(truncate=False)

+---------------------------------+-----------------+
|Region                           |max(TotalRevenue)|
+---------------------------------+-----------------+
|Sub-Saharan Africa               |6263026.44       |
|North America                    |6216247.54       |
|Middle East and North Africa     |6160781.13       |
|Europe                           |6617209.54       |
|Central America and the Caribbean|6354579.43       |
|Australia and Oceania            |6580454.69       |
|Asia                             |6557065.24       |
+---------------------------------+-----------------+



In [50]:
# join (inner) creditcard and sales_table, display the results
spark.sql("""select a.Region, a.Country, b.SalesPerson
       from creditcard a
       join sales_table b
       on trim(a.Region) = trim(b.Region)""").show(5, False)

+----------------------------+--------------------+-------------+
|Region                      |Country             |SalesPerson  |
+----------------------------+--------------------+-------------+
|Middle East and North Africa|United Arab Emirates|Mohammed Saif|
|Middle East and North Africa|Azerbaijan          |Mohammed Saif|
|Middle East and North Africa|Yemen               |Mohammed Saif|
|Middle East and North Africa|Bahrain             |Mohammed Saif|
|Middle East and North Africa|Jordan              |Mohammed Saif|
+----------------------------+--------------------+-------------+
only showing top 5 rows



In [51]:
# join (inner) creditcard and sales_table, apply a where condition, display the results
df = spark.sql("""select a.Region, a.Country, b.SalesPerson
       from creditcard a
       join sales_table b
       on trim(a.Region) = trim(b.Region)
       where trim(a.Region) = "Asia"
       """).show(5, False)

+------+--------+--------------+
|Region|Country |SalesPerson   |
+------+--------+--------------+
|Asia  |Nepal   |Tatyaso Martin|
|Asia  |Mongolia|Tatyaso Martin|
|Asia  |Brunei  |Tatyaso Martin|
|Asia  |Laos    |Tatyaso Martin|
|Asia  |Mongolia|Tatyaso Martin|
+------+--------+--------------+
only showing top 5 rows



In [52]:
# write the results in to DBFS
df = spark.sql("""select a.Region, a.Country, b.SalesPerson
       from creditcard a
       join sales_table b
       on trim(a.Region) = trim(b.Region)
       where trim(a.Region) = "Asia"
       """)


df.coalesce(1).write.option("header","true").mode("overwrite").csv("spark/")

In [53]:
## %fs ls "spark/" 

In [56]:
# read the csv file from stored location
newDF = spark.read.format("csv").option("header", "true").option("inferSchema", "true") \
.load("spark/part-00000-8c322dab-e10d-438d-abeb-9024e2cf7a7b-c000.csv")

newDF.show(10, False)

+------+----------+--------------+
|Region|Country   |SalesPerson   |
+------+----------+--------------+
|Asia  |Nepal     |Tatyaso Martin|
|Asia  |Mongolia  |Tatyaso Martin|
|Asia  |Brunei    |Tatyaso Martin|
|Asia  |Laos      |Tatyaso Martin|
|Asia  |Mongolia  |Tatyaso Martin|
|Asia  |Indonesia |Tatyaso Martin|
|Asia  |Bhutan    |Tatyaso Martin|
|Asia  |Tajikistan|Tatyaso Martin|
|Asia  |India     |Tatyaso Martin|
|Asia  |Maldives  |Tatyaso Martin|
+------+----------+--------------+
only showing top 10 rows

