## Let's practice Pyspark by cooking up some code

In [1]:
from pyspark.sql import SparkSession

In [2]:
%config Completer.use_jedi = False

In [3]:
spark = SparkSession.builder.master('local[1]').appName('SparkByExample.com').getOrCreate()

In [4]:
rdd = spark.sparkContext.parallelize([1,2,3,4,5])
rdd

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274

In [5]:
type(rdd)

pyspark.rdd.RDD

**Note:** RDD are fault tolerant and immutable distributed collections of objects and divided into logical partitions computed on different nodes of cluster.

## RDD
* Created using `parallelize()`
* text file `textfile()`
* Operations
    * Transformations : lazy operations, instead of updating a current RDD, these operations returns another RDD. Examples `flatmap(), map(), reduceByKey(), filter(), SortByKey()`
    * Actions : operations that trigger computation and return RDD values to the driver. Examples `count(), collect(), first(), max(), reduce()`

In [6]:
# RDD's can be created using parallelize method from list
rddcollect = rdd.collect()
rddcollect

[1, 2, 3, 4, 5]

In [7]:
print(f'Number of partitions: {str(rdd.getNumPartitions())}')

Number of partitions: 1


In [8]:
print(f'Action : First Element: {str(rdd.first())}')

Action : First Element: 1


In [9]:
print(f'is Empty RDD: {str(rdd.isEmpty())}')

is Empty RDD: False


## RDD Repartition vs Collasce
repartition() is used to increase or decrease the RDD/DataFrame partitions whereas the PySpark coalesce() is used to only decrease the number of partitions in an efficient way. 

👇️ These are very expensive operations as they shuffle the data across many partitions hence try to minimize using these as much as possible.👇️

In [10]:
rdd = spark.sparkContext.parallelize((0, 20))
print(f'default partitions : {str(rdd.getNumPartitions())}')

default partitions : 1


In [11]:
rdd1 = spark.sparkContext.parallelize(range(0,36), 6)
print(f'configured partitions : {str(rdd.getNumPartitions())}')

configured partitions : 1


In [12]:
rddfromfile = spark.sparkContext.textFile('textfile.txt', 10)
rddfromfile

textfile.txt MapPartitionsRDD[6] at textFile at NativeMethodAccessorImpl.java:0

In [13]:
print(f'textfile partitions : {str(rddfromfile.getNumPartitions())}')

textfile partitions : 11


In [14]:
! rm -rf partition
rdd1.saveAsTextFile('partition')

In [15]:
rdd2 = rdd1.repartition(4)

In [16]:
print(f'Repartitioned size : {str(rdd2.getNumPartitions())}')

Repartitioned size : 4


In [17]:
! rm -rf repartitioned
rdd2.saveAsTextFile('repartitioned')

In [18]:
rdd3 = rdd1.coalesce(4)
print(f'coalesce partitions : {str(rdd3.getNumPartitions())}')

coalesce partitions : 4


In [19]:
!rm -rf coalesce_files
rdd3.saveAsTextFile('coalesce_files')

In [20]:
rdd4 = rdd1.coalesce(10)

## DataFrame repartition() vs coalesce()

👆️ you can’t specify the partition/parallelism while creating DataFrame. DataFrame by default internally uses the methods specified in Section 1 to determine the default partition and splits the data for parallelism.👆️

In [21]:
df = spark.range(0,20)
df

DataFrame[id: bigint]

In [22]:
df.collect()

[Row(id=0),
 Row(id=1),
 Row(id=2),
 Row(id=3),
 Row(id=4),
 Row(id=5),
 Row(id=6),
 Row(id=7),
 Row(id=8),
 Row(id=9),
 Row(id=10),
 Row(id=11),
 Row(id=12),
 Row(id=13),
 Row(id=14),
 Row(id=15),
 Row(id=16),
 Row(id=17),
 Row(id=18),
 Row(id=19)]

In [23]:
print(type(df))

<class 'pyspark.sql.dataframe.DataFrame'>


In [24]:
print(f'Dataframe partitions : {str(df.rdd.getNumPartitions())}')

Dataframe partitions : 1


In [25]:
df.write.mode('overwrite').csv('dataframe_partitions.csv')

In [26]:
df2 = df.repartition(6)
print(f'repartitioned dataframe partitions : {str(df2.rdd.getNumPartitions())}')

repartitioned dataframe partitions : 6


In [27]:
df.write.mode('overwrite').csv('df2')

In [28]:
df3 = df.coalesce(2)
print(f'repartitioned dataframe partitions : {str(df3.rdd.getNumPartitions())}')

repartitioned dataframe partitions : 1


In [29]:
df4 = df.groupBy('id').count()
print(df4.rdd.getNumPartitions())

1


In [30]:
spark.sparkContext.getConf().getAll()

[('spark.app.id', 'local-1632466828817'),
 ('spark.app.startTime', '1632466827726'),
 ('spark.rdd.compress', 'True'),
 ('spark.app.name', 'SparkByExample.com'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.driver.port', '36707'),
 ('spark.submit.pyFiles', ''),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.master', 'local[1]'),
 ('spark.sql.warehouse.dir',
  'file:/home/srinidhi/Documents/pyspark%20practice/spark-warehouse'),
 ('spark.driver.host', 'fedora')]

## RDD Broadcast variables
* PySpark breaks the job into stages that have distributed shuffling and actions are executed with in the stage.
* Later Stages are also broken into tasks
* Spark broadcasts the common data (reusable) needed by tasks within each stage.
* The broadcasted data is cache in serialized format and deserialized before executing each task.

In [31]:
states = {'NY': 'New York', 'CA': 'California'}

In [32]:
broadcastStates = spark.sparkContext.broadcast(states)

In [33]:
data = [("James","Smith","USA","CA"),
    ("Michael","Rose","USA","NY"),
    ("Robert","Williams","USA","CA"),
    ("Maria","Jones","USA","FL")
  ]

In [34]:
rdd = spark.sparkContext.parallelize(data)

In [35]:
def state_convert(code):
    return broadcastStates.value[code]

In [36]:
result = rdd.map(lambda x : (x[0], x[1], x[2], state_convert(x[3])))

In [37]:
states = {"NY":"New York", "CA":"California", "FL":"Florida"}
broadcastStates = spark.sparkContext.broadcast(states)

data = [("James","Smith","USA","CA"),
    ("Michael","Rose","USA","NY"),
    ("Robert","Williams","USA","CA"),
    ("Maria","Jones","USA","FL")
  ]

rdd = spark.sparkContext.parallelize(data)

def state_convert(code):
    return broadcastStates.value[code]

result = rdd.map(lambda x: (x[0],x[1],x[2],state_convert(x[3]))).collect()
print(result)

[('James', 'Smith', 'USA', 'California'), ('Michael', 'Rose', 'USA', 'New York'), ('Robert', 'Williams', 'USA', 'California'), ('Maria', 'Jones', 'USA', 'Florida')]


## DataFrame Broadcast variables

In [38]:
states = {"NY":"New York", "CA":"California", "FL":"Florida"}
broadcastStates = spark.sparkContext.broadcast(states)

data = [("James","Smith","USA","CA"),
    ("Michael","Rose","USA","NY"),
    ("Robert","Williams","USA","CA"),
    ("Maria","Jones","USA","FL")
  ]

columns = ["firstname","lastname","country","state"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)

def state_convert(code):
    return broadcastStates.value[code]

result = df.rdd.map(lambda x: (x[0],x[1],x[2],state_convert(x[3]))).toDF(columns)
result.show(truncate=False)

root
 |-- firstname: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- country: string (nullable = true)
 |-- state: string (nullable = true)

+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|James    |Smith   |USA    |CA   |
|Michael  |Rose    |USA    |NY   |
|Robert   |Williams|USA    |CA   |
|Maria    |Jones   |USA    |FL   |
+---------+--------+-------+-----+

+---------+--------+-------+----------+
|firstname|lastname|country|state     |
+---------+--------+-------+----------+
|James    |Smith   |USA    |California|
|Michael  |Rose    |USA    |New York  |
|Robert   |Williams|USA    |California|
|Maria    |Jones   |USA    |Florida   |
+---------+--------+-------+----------+



## RDD Accumulator
* perform sum and counter operations similar to Map-reduce counters. These variables are shared by all executors to update and add information through aggregation or computative operations.
* Accumulators are write-only and initialize once variables where only tasks that are running on workers are allowed to update and updates from the workers get propagated automatically to the driver program. But, only the driver program is allowed to access the Accumulator variable using the value property.
* Using `accumulator()` from SparkContext class we can create an Accumulator in PySpark programming. Users can also create Accumulators for custom types using AccumulatorParam class of PySpark.
* `add()` function is used to add/update a value in accumulator
* `value` property on the accumulator variable is used to retrieve the value from the accumulator.

In [39]:
accum = spark.sparkContext.accumulator(0)

In [40]:
rdd = spark.sparkContext.parallelize([1,2,3,4,5])

In [41]:
rdd.foreach(lambda x: accum.add(x))

In [42]:
print(accum.value)

15


**Note:** `rdd.foreach()` is executed on workers and `accum.value` is called from PySpark driver program.

In [43]:
accuSum=spark.sparkContext.accumulator(0)
def countFun(x):
    global accuSum
    accuSum+=x
rdd.foreach(countFun)
print(accuSum.value)

15


In [44]:
accumCount=spark.sparkContext.accumulator(0)
rdd2=spark.sparkContext.parallelize([1,2,3,4,5])
rdd2.foreach(lambda x:accumCount.add(1))
print(accumCount.value)

5


## Aggregate functions 
* PySpark provides built-in standard Aggregate functions defines in DataFrame API, these come in handy when we need to make aggregate operations on DataFrame columns. Aggregate functions operate on a group of rows and calculate a single return value for every group.

In [45]:
simpleData = [("James", "Sales", 3000),
    ("Michael", "Sales", 4600),
    ("Robert", "Sales", 4100),
    ("Maria", "Finance", 3000),
    ("James", "Sales", 3000),
    ("Scott", "Finance", 3300),
    ("Jen", "Finance", 3900),
    ("Jeff", "Marketing", 3000),
    ("Kumar", "Marketing", 2000),
    ("Saif", "Sales", 4100)
  ]

In [46]:
schema = ["employee_name", "department", "salary"]

In [47]:
df = spark.createDataFrame(data=simpleData, schema = schema)
df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+



In [48]:
from pyspark.sql.functions import *
df.select(approx_count_distinct('salary')).collect()[0][0]
#returns the count of distinct items in a group.

6

In [49]:
df.select(avg('salary')).collect()[0][0]

3400.0

In [50]:
#collect_list() function returns all values from an input column with duplicates.
df.select(collect_list('salary')).show(truncate=False)

+------------------------------------------------------------+
|collect_list(salary)                                        |
+------------------------------------------------------------+
|[3000, 4600, 4100, 3000, 3000, 3300, 3900, 3000, 2000, 4100]|
+------------------------------------------------------------+



In [51]:
# collect_set() function returns all values from an input column with duplicate values eliminated.
df.select(collect_set('salary')).show(truncate=False)

+------------------------------------+
|collect_set(salary)                 |
+------------------------------------+
|[4600, 3000, 3900, 4100, 3300, 2000]|
+------------------------------------+



In [52]:
# countDistinct() function returns the number of distinct elements in a columns
df.select(countDistinct('salary')).show(truncate=False)

+----------------------+
|count(DISTINCT salary)|
+----------------------+
|6                     |
+----------------------+



### Grouping
* grouping() Indicates whether a given input column is aggregated or not. returns 1 for aggregated or 0 for not aggregated in the result. If you try grouping directly on the salary column you will get below error

In [53]:
# first() function returns the first element in a column when ignoreNulls is set to true, it returns the first non-null element.
df.select(first('salary')).show(truncate=False)

+-------------+
|first(salary)|
+-------------+
|3000         |
+-------------+



In [54]:
# last() function returns the last element in a column. when ignoreNulls is set to true, it returns the last non-null element.
df.select(last('salary')).show(truncate=False)

+------------+
|last(salary)|
+------------+
|4100        |
+------------+



In [55]:
# kurtosis() function returns the kurtosis of the values in a group.
df.select(kurtosis('salary')).show(truncate=False)

+-------------------+
|kurtosis(salary)   |
+-------------------+
|-0.6467803030303032|
+-------------------+



In [56]:
# max() function returns the maximum value in a column.
df.select(max('salary')).show(truncate=False)

+-----------+
|max(salary)|
+-----------+
|4600       |
+-----------+



In [57]:
# mean() function returns the average of the values in a column. Alias for Avg
df.select(mean('salary')).show(truncate=False)

+-----------+
|avg(salary)|
+-----------+
|3400.0     |
+-----------+



In [58]:
# skewness() function returns the skewness of the values in a group.
df.select(skewness('salary')).show(truncate=False)

+--------------------+
|skewness(salary)    |
+--------------------+
|-0.12041791181069571|
+--------------------+



**Note:**
stddev() alias for stddev_samp.

stddev_samp() function returns the sample standard deviation of values in a column.

stddev_pop() function returns the population standard deviation of the values in a column.

In [59]:
df.select(stddev("salary"), stddev_samp("salary"), \
    stddev_pop("salary")).show(truncate=False)

+-------------------+-------------------+------------------+
|stddev_samp(salary)|stddev_samp(salary)|stddev_pop(salary)|
+-------------------+-------------------+------------------+
|765.9416862050705  |765.9416862050705  |726.636084983398  |
+-------------------+-------------------+------------------+



In [60]:
# sumDistinct() function returns the sum of all distinct values in a column.

df.select(sumDistinct("salary")).show(truncate=False)

+--------------------+
|sum(DISTINCT salary)|
+--------------------+
|20900               |
+--------------------+



**Note:**
variance(), var_samp(), var_pop()
variance() alias for var_samp

var_samp() function returns the unbiased variance of the values in a column.

var_pop() function returns the population variance of the values in a column.

In [61]:
df.select(variance("salary"),var_samp("salary"),var_pop("salary")) \
  .show(truncate=False)

+-----------------+-----------------+---------------+
|var_samp(salary) |var_samp(salary) |var_pop(salary)|
+-----------------+-----------------+---------------+
|586666.6666666666|586666.6666666666|528000.0       |
+-----------------+-----------------+---------------+



## Window Functions 🔥🔥
* PySpark Window functions are used to calculate results such as the rank, row number e.t.c over a range of input rows.
* To perform an operation on a group first, we need to partition the data using Window.partitionBy() , and for row number and rank function we need to additionally order by on partition data using orderBy clause.

### row_number Window Function 👆️👍
* row_number() window function is used to give the sequential row number starting from 1 to the result of each window partition.

In [62]:
from pyspark.sql.window import Window

In [63]:
windowSpec = Window.partitionBy('department').orderBy('salary')

In [64]:
df.withColumn('row_number', row_number().over(windowSpec)).show(truncate=False)

+-------------+----------+------+----------+
|employee_name|department|salary|row_number|
+-------------+----------+------+----------+
|James        |Sales     |3000  |1         |
|James        |Sales     |3000  |2         |
|Robert       |Sales     |4100  |3         |
|Saif         |Sales     |4100  |4         |
|Michael      |Sales     |4600  |5         |
|Maria        |Finance   |3000  |1         |
|Scott        |Finance   |3300  |2         |
|Jen          |Finance   |3900  |3         |
|Kumar        |Marketing |2000  |1         |
|Jeff         |Marketing |3000  |2         |
+-------------+----------+------+----------+



### rank Window Function 👆️
* rank() window function is used to provide a rank to the result within a window partition. This function leaves gaps in rank when there are ties.

In [65]:
df.withColumn('rank', rank().over(windowSpec)).show(truncate=False)

+-------------+----------+------+----+
|employee_name|department|salary|rank|
+-------------+----------+------+----+
|James        |Sales     |3000  |1   |
|James        |Sales     |3000  |1   |
|Robert       |Sales     |4100  |3   |
|Saif         |Sales     |4100  |3   |
|Michael      |Sales     |4600  |5   |
|Maria        |Finance   |3000  |1   |
|Scott        |Finance   |3300  |2   |
|Jen          |Finance   |3900  |3   |
|Kumar        |Marketing |2000  |1   |
|Jeff         |Marketing |3000  |2   |
+-------------+----------+------+----+



### dense_rank Window Function 🎯
* `dense_rank()` window function is used to get the result with rank of rows within a window partition without any gaps. This is similar to rank() function difference being rank function leaves gaps in rank when there are ties.

In [66]:
df.withColumn('Dense_Rank',dense_rank().over(windowSpec)).show(truncate=False)

+-------------+----------+------+----------+
|employee_name|department|salary|Dense_Rank|
+-------------+----------+------+----------+
|James        |Sales     |3000  |1         |
|James        |Sales     |3000  |1         |
|Robert       |Sales     |4100  |2         |
|Saif         |Sales     |4100  |2         |
|Michael      |Sales     |4600  |3         |
|Maria        |Finance   |3000  |1         |
|Scott        |Finance   |3300  |2         |
|Jen          |Finance   |3900  |3         |
|Kumar        |Marketing |2000  |1         |
|Jeff         |Marketing |3000  |2         |
+-------------+----------+------+----------+



### percent_rank Window Function 😎

In [67]:
df.withColumn('Percent_Rank', percent_rank().over(windowSpec)).show(truncate=False)

+-------------+----------+------+------------+
|employee_name|department|salary|Percent_Rank|
+-------------+----------+------+------------+
|James        |Sales     |3000  |0.0         |
|James        |Sales     |3000  |0.0         |
|Robert       |Sales     |4100  |0.5         |
|Saif         |Sales     |4100  |0.5         |
|Michael      |Sales     |4600  |1.0         |
|Maria        |Finance   |3000  |0.0         |
|Scott        |Finance   |3300  |0.5         |
|Jen          |Finance   |3900  |1.0         |
|Kumar        |Marketing |2000  |0.0         |
|Jeff         |Marketing |3000  |1.0         |
+-------------+----------+------+------------+



### ntile Window Function 😃
* `ntile()` window function returns the relative rank of result rows within a window partition. In below example we have used 2 as an argument to ntile hence it returns ranking between 2 values (1 and 2)

In [68]:
df.withColumn('ntile', ntile(2).over(windowSpec)).show(truncate=False)

+-------------+----------+------+-----+
|employee_name|department|salary|ntile|
+-------------+----------+------+-----+
|James        |Sales     |3000  |1    |
|James        |Sales     |3000  |1    |
|Robert       |Sales     |4100  |1    |
|Saif         |Sales     |4100  |2    |
|Michael      |Sales     |4600  |2    |
|Maria        |Finance   |3000  |1    |
|Scott        |Finance   |3300  |1    |
|Jen          |Finance   |3900  |2    |
|Kumar        |Marketing |2000  |1    |
|Jeff         |Marketing |3000  |2    |
+-------------+----------+------+-----+



In [69]:
df.withColumn('ntile', ntile(3).over(windowSpec)).show(truncate=False)

+-------------+----------+------+-----+
|employee_name|department|salary|ntile|
+-------------+----------+------+-----+
|James        |Sales     |3000  |1    |
|James        |Sales     |3000  |1    |
|Robert       |Sales     |4100  |2    |
|Saif         |Sales     |4100  |2    |
|Michael      |Sales     |4600  |3    |
|Maria        |Finance   |3000  |1    |
|Scott        |Finance   |3300  |2    |
|Jen          |Finance   |3900  |3    |
|Kumar        |Marketing |2000  |1    |
|Jeff         |Marketing |3000  |2    |
+-------------+----------+------+-----+



## Window Analytic functions 🔥🔥

### cume_dist Window Function 😇
* `cume_dist()` window function is used to get the cumulative distribution of values within a window partition.

* This is the same as the DENSE_RANK function in SQL.

In [70]:
df.withColumn('cume_dist', cume_dist().over(windowSpec)).show(truncate=False)

+-------------+----------+------+------------------+
|employee_name|department|salary|cume_dist         |
+-------------+----------+------+------------------+
|James        |Sales     |3000  |0.4               |
|James        |Sales     |3000  |0.4               |
|Robert       |Sales     |4100  |0.8               |
|Saif         |Sales     |4100  |0.8               |
|Michael      |Sales     |4600  |1.0               |
|Maria        |Finance   |3000  |0.3333333333333333|
|Scott        |Finance   |3300  |0.6666666666666666|
|Jen          |Finance   |3900  |1.0               |
|Kumar        |Marketing |2000  |0.5               |
|Jeff         |Marketing |3000  |1.0               |
+-------------+----------+------+------------------+



### lag Window Function 🚩
* returns the value from previous row

In [71]:
df.withColumn('lag', lag('salary',2).over(windowSpec)).show(truncate=False)

+-------------+----------+------+----+
|employee_name|department|salary|lag |
+-------------+----------+------+----+
|James        |Sales     |3000  |null|
|James        |Sales     |3000  |null|
|Robert       |Sales     |4100  |3000|
|Saif         |Sales     |4100  |3000|
|Michael      |Sales     |4600  |4100|
|Maria        |Finance   |3000  |null|
|Scott        |Finance   |3300  |null|
|Jen          |Finance   |3900  |3000|
|Kumar        |Marketing |2000  |null|
|Jeff         |Marketing |3000  |null|
+-------------+----------+------+----+



In [72]:
df.withColumn('lag', lag('salary',1).over(windowSpec)).show(truncate=False)

+-------------+----------+------+----+
|employee_name|department|salary|lag |
+-------------+----------+------+----+
|James        |Sales     |3000  |null|
|James        |Sales     |3000  |3000|
|Robert       |Sales     |4100  |3000|
|Saif         |Sales     |4100  |4100|
|Michael      |Sales     |4600  |4100|
|Maria        |Finance   |3000  |null|
|Scott        |Finance   |3300  |3000|
|Jen          |Finance   |3900  |3300|
|Kumar        |Marketing |2000  |null|
|Jeff         |Marketing |3000  |2000|
+-------------+----------+------+----+



### lead Window Function 🥺
* Returns values from next row 

In [73]:
df.withColumn('Lead',lead('salary', 2).over(windowSpec)).show(truncate=False)

+-------------+----------+------+----+
|employee_name|department|salary|Lead|
+-------------+----------+------+----+
|James        |Sales     |3000  |4100|
|James        |Sales     |3000  |4100|
|Robert       |Sales     |4100  |4600|
|Saif         |Sales     |4100  |null|
|Michael      |Sales     |4600  |null|
|Maria        |Finance   |3000  |3900|
|Scott        |Finance   |3300  |null|
|Jen          |Finance   |3900  |null|
|Kumar        |Marketing |2000  |null|
|Jeff         |Marketing |3000  |null|
+-------------+----------+------+----+



In [74]:
df.withColumn('Lead',lead('salary', 1).over(windowSpec)).show(truncate=False)

+-------------+----------+------+----+
|employee_name|department|salary|Lead|
+-------------+----------+------+----+
|James        |Sales     |3000  |3000|
|James        |Sales     |3000  |4100|
|Robert       |Sales     |4100  |4100|
|Saif         |Sales     |4100  |4600|
|Michael      |Sales     |4600  |null|
|Maria        |Finance   |3000  |3300|
|Scott        |Finance   |3300  |3900|
|Jen          |Finance   |3900  |null|
|Kumar        |Marketing |2000  |3000|
|Jeff         |Marketing |3000  |null|
+-------------+----------+------+----+



In [75]:
df.withColumn('Lead',lead('salary', -1).over(windowSpec)).show(truncate=False)

+-------------+----------+------+----+
|employee_name|department|salary|Lead|
+-------------+----------+------+----+
|James        |Sales     |3000  |null|
|James        |Sales     |3000  |3000|
|Robert       |Sales     |4100  |3000|
|Saif         |Sales     |4100  |4100|
|Michael      |Sales     |4600  |4100|
|Maria        |Finance   |3000  |null|
|Scott        |Finance   |3300  |3000|
|Jen          |Finance   |3900  |3300|
|Kumar        |Marketing |2000  |null|
|Jeff         |Marketing |3000  |2000|
+-------------+----------+------+----+



## Window Aggregate Functions ☀️

In [76]:
windowSpecAgg  = Window.partitionBy("department")

df.withColumn("row",row_number().over(windowSpec)) \
  .withColumn("avg", avg(col("salary")).over(windowSpecAgg)) \
  .withColumn("sum", sum(col("salary")).over(windowSpecAgg)) \
  .withColumn("min", min(col("salary")).over(windowSpecAgg)) \
  .withColumn("max", max(col("salary")).over(windowSpecAgg)) \
  .where(col("row")==1).select("department","avg","sum","min","max") \
  .show()

+----------+------+-----+----+----+
|department|   avg|  sum| min| max|
+----------+------+-----+----+----+
|     Sales|3760.0|18800|3000|4600|
|   Finance|3400.0|10200|3000|3900|
| Marketing|2500.0| 5000|2000|3000|
+----------+------+-----+----+----+



## Date and Timestamp Functions 📅
* Date and Timestamp Functions are supported on DataFrame and SQL queries and they work similarly to traditional SQL, Date and Time are very important if you are using PySpark for ETL. Most of all these functions accept input as, Date type, Timestamp type, or String. If a String used, it should be in a default format that can be cast to date.

* DateType default format is yyyy-MM-dd 
* TimestampType default format is yyyy-MM-dd HH:mm:ss.SSSS
* Returns null if the input is a string that can not be cast to Date or Timestamp.

### Date Functions 

In [77]:
# lets build the data 
data=[["1","2020-02-01"],["2","2019-03-01"],["3","2021-03-01"]]
df=spark.createDataFrame(data,["id","input"])
df.show()

+---+----------+
| id|     input|
+---+----------+
|  1|2020-02-01|
|  2|2019-03-01|
|  3|2021-03-01|
+---+----------+



In [78]:
df.select(current_date().alias('current_date')).show(1)

+------------+
|current_date|
+------------+
|  2021-09-24|
+------------+
only showing top 1 row



In [79]:
# The below example uses date_format() to parses the date and converts from yyyy-dd-mm to MM-dd-yyyy format.
df.select(
    col('input'),
    date_format(col('input'), 'MM-dd-yyyy').alias('date_format')
).show()

+----------+-----------+
|     input|date_format|
+----------+-----------+
|2020-02-01| 02-01-2020|
|2019-03-01| 03-01-2019|
|2021-03-01| 03-01-2021|
+----------+-----------+



In [80]:
# Below example converts string in date format yyyy-MM-dd to a DateType yyyy-MM-dd using to_date(). You can also use this to convert into any specific format. PySpark supports all patterns supports on Java DateTimeFormatter.

df.select(
    col('input'),
    to_date(col('input'), 'yyy-MM-dd').alias('to_date')
).show()

+----------+----------+
|     input|   to_date|
+----------+----------+
|2020-02-01|2020-02-01|
|2019-03-01|2019-03-01|
|2021-03-01|2021-03-01|
+----------+----------+



In [81]:
# The below example returns the difference between two dates using datediff().
df.select(
    col('input'),
    datediff(current_date(), col('input')).alias('datediff')
).show()

+----------+--------+
|     input|datediff|
+----------+--------+
|2020-02-01|     601|
|2019-03-01|     938|
|2021-03-01|     207|
+----------+--------+



In [82]:
# The below example returns the months between two dates using months_between().

df.select(
    col('input'),
    months_between(current_date(), col('input')).alias('months_between')
).show()

+----------+--------------+
|     input|months_between|
+----------+--------------+
|2020-02-01|   19.74193548|
|2019-03-01|   30.74193548|
|2021-03-01|    6.74193548|
+----------+--------------+



In [83]:
# The below example truncates the date at a specified unit using trunc().
df.select(col("input"), 
    trunc(col("input"),"Month").alias("Month_Trunc"), 
    trunc(col("input"),"Year").alias("Month_Year"), 
    trunc(col("input"),"Month").alias("Month_Trunc")
   ).show()

+----------+-----------+----------+-----------+
|     input|Month_Trunc|Month_Year|Month_Trunc|
+----------+-----------+----------+-----------+
|2020-02-01| 2020-02-01|2020-01-01| 2020-02-01|
|2019-03-01| 2019-03-01|2019-01-01| 2019-03-01|
|2021-03-01| 2021-03-01|2021-01-01| 2021-03-01|
+----------+-----------+----------+-----------+



In [84]:
# Here we are adding and subtracting date and month from a given input.
df.select(
    col('input'),
    add_months(col('input'), 3).alias('add_months'),
    add_months(col('input'), -3).alias('sub_months'),
    date_add(col('input'), 3).alias('date_add'),
    date_add(col('input'), -3).alias('date_sub')
).show()

+----------+----------+----------+----------+----------+
|     input|add_months|sub_months|  date_add|  date_sub|
+----------+----------+----------+----------+----------+
|2020-02-01|2020-05-01|2019-11-01|2020-02-04|2020-01-29|
|2019-03-01|2019-06-01|2018-12-01|2019-03-04|2019-02-26|
|2021-03-01|2021-06-01|2020-12-01|2021-03-04|2021-02-26|
+----------+----------+----------+----------+----------+



In [85]:
df.select(col("input"), 
     year(col("input")).alias("year"), 
     month(col("input")).alias("month"), 
     next_day(col("input"),"Sunday").alias("next_day"), 
     weekofyear(col("input")).alias("weekofyear") 
  ).show()


+----------+----+-----+----------+----------+
|     input|year|month|  next_day|weekofyear|
+----------+----+-----+----------+----------+
|2020-02-01|2020|    2|2020-02-02|         5|
|2019-03-01|2019|    3|2019-03-03|         9|
|2021-03-01|2021|    3|2021-03-07|         9|
+----------+----+-----+----------+----------+



In [86]:
df.select(col("input"),  
     dayofweek(col("input")).alias("dayofweek"), 
     dayofmonth(col("input")).alias("dayofmonth"), 
     dayofyear(col("input")).alias("dayofyear"), 
  ).show()

+----------+---------+----------+---------+
|     input|dayofweek|dayofmonth|dayofyear|
+----------+---------+----------+---------+
|2020-02-01|        7|         1|       32|
|2019-03-01|        6|         1|       60|
|2021-03-01|        2|         1|       60|
+----------+---------+----------+---------+



### Timestamp Functions ⌛️⌛️

In [87]:
data=[["1","02-01-2020 11 01 19 06"],["2","03-01-2019 12 01 19 406"],["3","03-01-2021 12 01 19 406"]]
df2=spark.createDataFrame(data,["id","input"])
df2.show(truncate=False)

+---+-----------------------+
|id |input                  |
+---+-----------------------+
|1  |02-01-2020 11 01 19 06 |
|2  |03-01-2019 12 01 19 406|
|3  |03-01-2021 12 01 19 406|
+---+-----------------------+



In [88]:
df2.select(current_timestamp().alias('Current_timestamp')).show(truncate=False)

+--------------------------+
|Current_timestamp         |
+--------------------------+
|2021-09-24 12:31:34.285474|
|2021-09-24 12:31:34.285474|
|2021-09-24 12:31:34.285474|
+--------------------------+



In [89]:
import pyspark.sql.functions as sf

In [90]:
df2.select(
    col('input'),
    sf.to_timestamp(col('input'), 'MM-dd-yyyy HH mm ss SSS').alias('to_timestamp')
).show(truncate=False)

+-----------------------+-----------------------+
|input                  |to_timestamp           |
+-----------------------+-----------------------+
|02-01-2020 11 01 19 06 |2020-02-01 11:01:19.06 |
|03-01-2019 12 01 19 406|2019-03-01 12:01:19.406|
|03-01-2021 12 01 19 406|2021-03-01 12:01:19.406|
+-----------------------+-----------------------+



In [91]:
data=[["1","2020-02-01 11:01:19.06"],["2","2019-03-01 12:01:19.406"],["3","2021-03-01 12:01:19.406"]]
df3=spark.createDataFrame(data,["id","input"])

In [92]:
df3.show(truncate=False)

+---+-----------------------+
|id |input                  |
+---+-----------------------+
|1  |2020-02-01 11:01:19.06 |
|2  |2019-03-01 12:01:19.406|
|3  |2021-03-01 12:01:19.406|
+---+-----------------------+



In [93]:
df3.select(
    col('input'),
    sf.hour(col('input')).alias('hour'),
    sf.minute(col('input')).alias('minute'),
    sf.second(col('input')).alias('second')
).show(truncate=False)

+-----------------------+----+------+------+
|input                  |hour|minute|second|
+-----------------------+----+------+------+
|2020-02-01 11:01:19.06 |11  |1     |19    |
|2019-03-01 12:01:19.406|12  |1     |19    |
|2021-03-01 12:01:19.406|12  |1     |19    |
+-----------------------+----+------+------+



## DataFrame with Examples 📊
* You can manually create a PySpark DataFrame using toDF() and createDataFrame() methods, both these function takes different signatures in order to create DataFrame from existing RDD, list, and DataFrame.
* You can also create PySpark DataFrame from data sources like TXT, CSV, JSON, ORV, Avro, Parquet, XML formats by reading from HDFS, S3, DBFS, Azure Blob file systems e.t.c.
* Finally, PySpark DataFrame also can be created by reading data from RDBMS Databases and NoSQL databases.



In [94]:
# lets create a dataset
columns = ['langugage','users_count']
data = [('Java', '20000'), ('Python','100000'),('Scala', '3000')]

In [96]:
# create the dataframe
rdd = spark.sparkContext.parallelize(data)
rdd.collect()
rdd

ParallelCollectionRDD[318] at readRDDFromFile at PythonRDD.scala:274

In [97]:
# using to_DF()
dfFromRDD1 = rdd.toDF()
dfFromRDD1

DataFrame[_1: string, _2: string]

In [98]:
dfFromRDD1.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)



In [99]:
dfFromRDD1 = rdd.toDF(columns)
dfFromRDD1.printSchema()

root
 |-- langugage: string (nullable = true)
 |-- users_count: string (nullable = true)



### Using `createDataFrame()`

In [100]:
dfFromRDD2 = spark.createDataFrame(rdd).toDF(*columns)
dfFromRDD2

DataFrame[langugage: string, users_count: string]

In [101]:
dfFromRDD2.printSchema()

root
 |-- langugage: string (nullable = true)
 |-- users_count: string (nullable = true)



In [103]:
dfFromData2 = spark.createDataFrame(data).toDF(*columns)
dfFromData2.printSchema()

root
 |-- langugage: string (nullable = true)
 |-- users_count: string (nullable = true)



In [104]:
dfFromData2.collect()

[Row(langugage='Java', users_count='20000'),
 Row(langugage='Python', users_count='100000'),
 Row(langugage='Scala', users_count='3000')]

### Using `createDataFrame()` with row type 🤨🤨

In [107]:
import pyspark.sql as sql
rowData = map(lambda x : sql.Row(*x), data)
dfFromData3 = spark.createDataFrame(rowData, columns)
dfFromData3.printSchema()

root
 |-- langugage: string (nullable = true)
 |-- users_count: string (nullable = true)



In [108]:
dfFromData3.collect()

[Row(langugage='Java', users_count='20000'),
 Row(langugage='Python', users_count='100000'),
 Row(langugage='Scala', users_count='3000')]

### Create DataFrame with schema 💪💪
* If you wanted to specify the column names along with their data types, you should create the StructType schema first and then assign this while creating a DataFrame.

In [109]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
data2 = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

In [110]:
schema = StructType([
    StructField('firstname', StringType(), True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
])

In [111]:
df = spark.createDataFrame(data=data2, schema=schema)
df.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)



In [112]:
df.show(truncate=False)

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|id   |gender|salary|
+---------+----------+--------+-----+------+------+
|James    |          |Smith   |36636|M     |3000  |
|Michael  |Rose      |        |40288|M     |4000  |
|Robert   |          |Williams|42114|M     |4000  |
|Maria    |Anne      |Jones   |39192|F     |4000  |
|Jen      |Mary      |Brown   |     |F     |-1    |
+---------+----------+--------+-----+------+------+



### Create DataFrame using data sources 🔔🔔🔔
* `df2 = spark.read.csv("/src/resources/file.csv")`
* `df2 = spark.read.text("/src/resources/file.txt")`
* `df2 = spark.read.json("/src/resources/file.json")`

### Convert the DataFrame to Pandas DF 🐼🐼

In [115]:
pandasDF = df.toPandas()
pandasDF

Unnamed: 0,firstname,middlename,lastname,id,gender,salary
0,James,,Smith,36636.0,M,3000
1,Michael,Rose,,40288.0,M,4000
2,Robert,,Williams,42114.0,M,4000
3,Maria,Anne,Jones,39192.0,F,4000
4,Jen,Mary,Brown,,F,-1


## User Defined Functions 🎨🎨
* UDF’s are the most expensive operations hence use them only you have no choice and when essential. In the later section of the article, I will explain why using UDF’s is an expensive operation in detail.
* PySpark UDF’s are similar to UDF on traditional databases. In PySpark, you create a function in a Python syntax and wrap it with PySpark SQL udf() or register it as udf and use it on DataFrame and SQL respectively.
* UDF’s are used to extend the functions of the framework and re-use these functions on multiple DataFrame’s. For example, you wanted to convert every first letter of a word in a name string to a capital case; PySpark build-in features don’t have this function hence you can create it a UDF and reuse this as needed on many Data Frames. UDF’s are once created they can be re-used on several DataFrame’s and SQL expressions.
* Before you create any UDF, do your research to check if the similar function you wanted is already available in Spark SQL Functions. PySpark SQL provides several predefined common functions and many more new functions are added with every release. hence, It is best to check before you reinventing the wheel.

### Steps in creating and using UDF 🪜🪜
* Create a Python Function
* Convert a Python function to PySpark UDF
* Using UDF with DataFrame
    * Using UDF with PySpark DataFrame withColumn()
    * Registering PySpark UDF & use it on SQL

In [116]:
columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]

df = spark.createDataFrame(data=data,schema=columns)

df.show(truncate=False)

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
+-----+------------+



In [117]:
def convertCase(str):
    '''
    Returns the input string with all words first letter capitalized.
    '''
    resStr = ''
    arr = str.split(' ')
    for x in arr:
        resStr = resStr + x[0:1].upper() + x[1:len(x)] + ' '
    return resStr

In [118]:
convertUDF = udf(lambda z : convertCase(z), StringType())

In [119]:
df.select(
    col('Seqno'),
    convertUDF(col('Name')).alias('Name')
).show(truncate=False)

+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+



In [6]:
lst = ['  abcd  ',  'abcd    aa bb ccc   ']
for i in lst:
    i = ' '.join((i.strip()).split())
    print(i)

abcd
abcd aa bb ccc
