In [1]:
#import pyspark
from pyspark.sql import SparkSession

Spark – Default interface for Scala and Java

PySpark – Python interface for Spark

SparklyR – R interface for Spark.


Modules & packages:

- PySpark RDD (pyspark.RDD)

- PySpark DataFrame and SQL (pyspark.sql)

- PySpark Streaming (pyspark.streaming)

- PySpark MLib (pyspark.ml, pyspark.mllib)

- PySpark GraphFrames (GraphFrames)

- PySpark Resource (pyspark.resource) It’s new in PySpark 3.0

- SparkSession can be created using a builder() or newSession() methods of the SparkSession.

Spark session internally creates a sparkContext variable of SparkContext. 

You can create multiple SparkSession objects but only one SparkContext per JVM. 

- In case if you want to create another new SparkContext you should stop existing Sparkcontext (using stop()) before creating a new one.

In [2]:
# create spark session
spark = SparkSession.builder.appName('SparkSession1').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/01/19 17:44:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:

# Create DataFrame in PySpark Shell
data = [("Java", 20000), ("Python", 100000), ("Scala", 3000),('R',80000)]


- By using createDataFrame() function of the SparkSession you can create a DataFrame

In [4]:
df = spark.createDataFrame(data)
df.show()

                                                                                

+------+------+
|    _1|    _2|
+------+------+
|  Java| 20000|
|Python|100000|
| Scala|  3000|
|     R| 80000|
+------+------+



In [5]:
df.head(1)

[Row(_1='Java', _2=20000)]

In [6]:
df.columns

['_1', '_2']

In [7]:
df.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: long (nullable = true)



In [8]:
df.dtypes

[('_1', 'string'), ('_2', 'bigint')]

In [9]:
df.withColumnRenamed('_1','Column 1').printSchema()

root
 |-- Column 1: string (nullable = true)
 |-- _2: long (nullable = true)



In [10]:
df.withColumnRenamed('_1','Column 1').withColumnRenamed('_2','Column 2').printSchema()

root
 |-- Column 1: string (nullable = true)
 |-- Column 2: long (nullable = true)



In [11]:
df2 = df.withColumnRenamed('_1','Column 1').withColumnRenamed('_2','Column 2')

In [12]:
df2.printSchema()

root
 |-- Column 1: string (nullable = true)
 |-- Column 2: long (nullable = true)



In [13]:
df2.show()

+--------+--------+
|Column 1|Column 2|
+--------+--------+
|    Java|   20000|
|  Python|  100000|
|   Scala|    3000|
|       R|   80000|
+--------+--------+



In [14]:

data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]



In [15]:

# define the columns names
columns = ["firstname","middlename","lastname","dob","gender","salary"]

# create Df form data and columns (shema)

df3 = spark.createDataFrame(data=data, schema = columns)
df3.show()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+



DataFrames are created from external sources like files from:

the local system, HDFS, 
S3 Azure, HBase, 
MySQL table e.t.c. 

Below is an example of how to read a CSV file from a local system.


In [16]:
# create spark session
spark = SparkSession.builder.appName('SparkforSql').getOrCreate()

In [17]:

df4 = spark.read.option("header",True) \
    .csv("country_codes.csv")


#df = spark.read.option("header",True) \
#     .csv("/tmp/resources/zipcodes.csv")


In [18]:
df4.show()

+-------------------+------------+------------+-------+
|            Country|Alpha-2 code|Alpha-3 code|Numeric|
+-------------------+------------+------------+-------+
|        Afghanistan|          AF|         AFG|      4|
|            Albania|          AL|         ALB|      8|
|            Algeria|          DZ|         DZA|     12|
|     American Samoa|          AS|         ASM|     16|
|            Andorra|          AD|         AND|     20|
|             Angola|          AO|         AGO|     24|
|           Anguilla|          AI|         AIA|    660|
|         Antarctica|          AQ|         ATA|     10|
|Antigua and Barbuda|          AG|         ATG|     28|
|          Argentina|          AR|         ARG|     32|
|            Armenia|          AM|         ARM|     51|
|              Aruba|          AW|         ABW|    533|
|          Australia|          AU|         AUS|     36|
|            Austria|          AT|         AUT|     40|
|         Azerbaijan|          AZ|         AZE| 

In [19]:
# array data
arrayData = [
        ('James',['Java','Scala'],{'hair':'black','eye':'brown'}),
        ('Michael',['Spark','Java',None],{'hair':'brown','eye':None}),
        ('Robert',['CSharp',''],{'hair':'red','eye':''}),
        ('Washington',None,None),
        ('Jefferson',['1','2'],{})
        ]

arrayData



[('James', ['Java', 'Scala'], {'hair': 'black', 'eye': 'brown'}),
 ('Michael', ['Spark', 'Java', None], {'hair': 'brown', 'eye': None}),
 ('Robert', ['CSharp', ''], {'hair': 'red', 'eye': ''}),
 ('Washington', None, None),
 ('Jefferson', ['1', '2'], {})]

In [20]:

df5 = spark.createDataFrame(data=arrayData, schema = ['name','knownLanguages','properties'])
df5.printSchema()
df5.show()


root
 |-- name: string (nullable = true)
 |-- knownLanguages: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+----------+-------------------+--------------------+
|      name|     knownLanguages|          properties|
+----------+-------------------+--------------------+
|     James|      [Java, Scala]|{eye -> brown, ha...|
|   Michael|[Spark, Java, null]|{eye -> null, hai...|
|    Robert|         [CSharp, ]|{eye -> , hair ->...|
|Washington|               null|                null|
| Jefferson|             [1, 2]|                  {}|
+----------+-------------------+--------------------+



In [32]:
# use SQL, first, 
# create a temporary table on DataFrame 
# using createOrReplaceTempView() function.

df5.createOrReplaceTempView("PERSON_Lenguages")  # previous DF
df6 = spark.sql("SELECT * from PERSON_Lenguages")



In [33]:
df6.printSchema()
df6.show()


root
 |-- name: string (nullable = true)
 |-- knownLanguages: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+----------+-------------------+--------------------+
|      name|     knownLanguages|          properties|
+----------+-------------------+--------------------+
|     James|      [Java, Scala]|{eye -> brown, ha...|
|   Michael|[Spark, Java, null]|{eye -> null, hai...|
|    Robert|         [CSharp, ]|{eye -> , hair ->...|
|Washington|               null|                null|
| Jefferson|             [1, 2]|                  {}|
+----------+-------------------+--------------------+



In [25]:
# new data

data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]

columns = ["firstname","middlename","lastname","dob","gender","salary"]
df7 = spark.createDataFrame(data=data, schema = columns)
df7.show()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+



In [27]:
# using createOrReplaceTempView() function.

df7.createOrReplaceTempView("PERSON_DATA")  # previous DF
df9 = spark.sql("SELECT * from PERSON_DATA")
df9.show()
df9.printSchema()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)



In [28]:
# have two  temporary table on DataFrame 
df6.show()
df9.show()

+----------+-------------------+--------------------+
|      name|     knownLanguages|          properties|
+----------+-------------------+--------------------+
|     James|      [Java, Scala]|{eye -> brown, ha...|
|   Michael|[Spark, Java, null]|{eye -> null, hai...|
|    Robert|         [CSharp, ]|{eye -> , hair ->...|
|Washington|               null|                null|
| Jefferson|             [1, 2]|                  {}|
+----------+-------------------+--------------------+

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+


In [35]:

groupDF = spark.sql("SELECT gender, count(*) from PERSON_DATA group by gender")
groupDF.show()


+------+--------+
|gender|count(1)|
+------+--------+
|     M|       3|
|     F|       2|
+------+--------+



In [36]:
groupDFL = spark.sql("SELECT  knownLanguages from PERSON_Lenguages")
groupDFL.show()

+-------------------+
|     knownLanguages|
+-------------------+
|      [Java, Scala]|
|[Spark, Java, null]|
|         [CSharp, ]|
|               null|
|             [1, 2]|
+-------------------+



In [39]:
groupDF.show()
groupDF.printSchema()

[Stage 49:>                                                         (0 + 8) / 8]

+------+--------+
|gender|count(1)|
+------+--------+
|     M|       3|
|     F|       2|
+------+--------+

root
 |-- gender: string (nullable = true)
 |-- count(1): long (nullable = false)



                                                                                

In [40]:
groupDFL.show()
groupDFL.printSchema()

+-------------------+
|     knownLanguages|
+-------------------+
|      [Java, Scala]|
|[Spark, Java, null]|
|         [CSharp, ]|
|               null|
|             [1, 2]|
+-------------------+

root
 |-- knownLanguages: array (nullable = true)
 |    |-- element: string (containsNull = true)



Add months

In [41]:
# -*- coding: utf-8 -*-
"""
author SparkByExamples.com
"""
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkMonth').getOrCreate()

from pyspark.sql.functions import col,expr



In [42]:
data=[("2019-01-23",1),("2019-06-24",2),("2019-09-20",3)]
data

[('2019-01-23', 1), ('2019-06-24', 2), ('2019-09-20', 3)]

In [43]:
spark.createDataFrame(data).toDF("date","increment") \
    .select(col("date"),col("increment"), \
      expr("add_months(to_date(date,'yyyy-MM-dd'),cast(increment as int))").alias("inc_date")) \
    .show()

+----------+---------+----------+
|      date|increment|  inc_date|
+----------+---------+----------+
|2019-01-23|        1|2019-02-23|
|2019-06-24|        2|2019-08-24|
|2019-09-20|        3|2019-12-20|
+----------+---------+----------+



Add new columns

In [44]:

data = [('James','Smith','M',3000),
  ('Anna','Rose','F',4100),
  ('Robert','Williams','M',6200), 
]

columns = ["firstname","lastname","gender","salary"]

df = spark.createDataFrame(data=data, schema = columns)
df.show()


+---------+--------+------+------+
|firstname|lastname|gender|salary|
+---------+--------+------+------+
|    James|   Smith|     M|  3000|
|     Anna|    Rose|     F|  4100|
|   Robert|Williams|     M|  6200|
+---------+--------+------+------+



In [45]:

if 'salary1' not in df.columns:
    print("aa")
    
# Add new constanct column
from pyspark.sql.functions import lit

df.withColumn("bonus_percent", lit(0.3)) \
  .show()
  
#Add column from existing column
df.withColumn("bonus_amount", df.salary*0.3) \
  .show()

#Add column by concatinating existing columns
from pyspark.sql.functions import concat_ws

df.withColumn("name", concat_ws(",","firstname",'lastname')) \
  .show()


#Add current date
from pyspark.sql.functions import current_date

df.withColumn("current_date", current_date()) \
  .show()


aa
+---------+--------+------+------+-------------+
|firstname|lastname|gender|salary|bonus_percent|
+---------+--------+------+------+-------------+
|    James|   Smith|     M|  3000|          0.3|
|     Anna|    Rose|     F|  4100|          0.3|
|   Robert|Williams|     M|  6200|          0.3|
+---------+--------+------+------+-------------+

+---------+--------+------+------+------------+
|firstname|lastname|gender|salary|bonus_amount|
+---------+--------+------+------+------------+
|    James|   Smith|     M|  3000|       900.0|
|     Anna|    Rose|     F|  4100|      1230.0|
|   Robert|Williams|     M|  6200|      1860.0|
+---------+--------+------+------+------------+

+---------+--------+------+------+---------------+
|firstname|lastname|gender|salary|           name|
+---------+--------+------+------+---------------+
|    James|   Smith|     M|  3000|    James,Smith|
|     Anna|    Rose|     F|  4100|      Anna,Rose|
|   Robert|Williams|     M|  6200|Robert,Williams|
+---------

In [46]:

from pyspark.sql.functions import when

df.withColumn("grade", \
   when((df.salary < 4000), lit("A")) \
     .when((df.salary >= 4000) & (df.salary <= 5000), lit("B")) \
     .otherwise(lit("C")) \
  ).show()


+---------+--------+------+------+-----+
|firstname|lastname|gender|salary|grade|
+---------+--------+------+------+-----+
|    James|   Smith|     M|  3000|    A|
|     Anna|    Rose|     F|  4100|    B|
|   Robert|Williams|     M|  6200|    C|
+---------+--------+------+------+-----+



In [47]:


# Add column using select
df.select("firstname","salary", lit(0.3).alias("bonus")).show()
df.select("firstname","salary", lit(df.salary * 0.3).alias("bonus_amount")).show()
df.select("firstname","salary", current_date().alias("today_date")).show()



+---------+------+-----+
|firstname|salary|bonus|
+---------+------+-----+
|    James|  3000|  0.3|
|     Anna|  4100|  0.3|
|   Robert|  6200|  0.3|
+---------+------+-----+

+---------+------+------------+
|firstname|salary|bonus_amount|
+---------+------+------------+
|    James|  3000|       900.0|
|     Anna|  4100|      1230.0|
|   Robert|  6200|      1860.0|
+---------+------+------------+

+---------+------+----------+
|firstname|salary|today_date|
+---------+------+----------+
|    James|  3000|2023-01-19|
|     Anna|  4100|2023-01-19|
|   Robert|  6200|2023-01-19|
+---------+------+----------+



In [48]:

#Add columns using SQL
df.createOrReplaceTempView("PER") # temporary table

spark.sql("select firstname,salary, '0.3' as bonus from PER").show()
spark.sql("select firstname,salary, salary * 0.3 as bonus_amount from PER").show()
spark.sql("select firstname,salary, current_date() as today_date from PER").show()
spark.sql("select firstname,salary, " +
          "case salary when salary < 4000 then 'A' "+
          "else 'B' END as grade from PER").show()



+---------+------+-----+
|firstname|salary|bonus|
+---------+------+-----+
|    James|  3000|  0.3|
|     Anna|  4100|  0.3|
|   Robert|  6200|  0.3|
+---------+------+-----+

+---------+------+------------+
|firstname|salary|bonus_amount|
+---------+------+------------+
|    James|  3000|       900.0|
|     Anna|  4100|      1230.0|
|   Robert|  6200|      1860.0|
+---------+------+------------+

+---------+------+----------+
|firstname|salary|today_date|
+---------+------+----------+
|    James|  3000|2023-01-19|
|     Anna|  4100|2023-01-19|
|   Robert|  6200|2023-01-19|
+---------+------+----------+

+---------+------+-----+
|firstname|salary|grade|
+---------+------+-----+
|    James|  3000|    B|
|     Anna|  4100|    B|
|   Robert|  6200|    B|
+---------+------+-----+



In [49]:
spark.sql("select * from PER").show()

+---------+--------+------+------+
|firstname|lastname|gender|salary|
+---------+--------+------+------+
|    James|   Smith|     M|  3000|
|     Anna|    Rose|     F|  4100|
|   Robert|Williams|     M|  6200|
+---------+--------+------+------+



In [53]:

if 'salary1' not in df.columns:
    print("aa")
    
# Add new constanct column
from pyspark.sql.functions import lit

dfM = df.withColumn("bonus_percent", lit(0.3))
dfM.show()

aa
+---------+--------+------+------+-------------+
|firstname|lastname|gender|salary|bonus_percent|
+---------+--------+------+------+-------------+
|    James|   Smith|     M|  3000|          0.3|
|     Anna|    Rose|     F|  4100|          0.3|
|   Robert|Williams|     M|  6200|          0.3|
+---------+--------+------+------+-------------+



In [54]:
#Add column from existing column
dfM = dfM.withColumn("bonus_amount", df.salary*0.3)
dfM.show()

+---------+--------+------+------+-------------+------------+
|firstname|lastname|gender|salary|bonus_percent|bonus_amount|
+---------+--------+------+------+-------------+------------+
|    James|   Smith|     M|  3000|          0.3|       900.0|
|     Anna|    Rose|     F|  4100|          0.3|      1230.0|
|   Robert|Williams|     M|  6200|          0.3|      1860.0|
+---------+--------+------+------+-------------+------------+



In [55]:
#Add column by concatinating existing columns
from pyspark.sql.functions import concat_ws

dfM = dfM.withColumn("name", concat_ws(",","firstname",'lastname')) 


#Add current date
from pyspark.sql.functions import current_date

dfM = dfM.withColumn("current_date", current_date())

dfM.show()
dfM.printSchema()

+---------+--------+------+------+-------------+------------+---------------+------------+
|firstname|lastname|gender|salary|bonus_percent|bonus_amount|           name|current_date|
+---------+--------+------+------+-------------+------------+---------------+------------+
|    James|   Smith|     M|  3000|          0.3|       900.0|    James,Smith|  2023-01-19|
|     Anna|    Rose|     F|  4100|          0.3|      1230.0|      Anna,Rose|  2023-01-19|
|   Robert|Williams|     M|  6200|          0.3|      1860.0|Robert,Williams|  2023-01-19|
+---------+--------+------+------+-------------+------------+---------------+------------+

root
 |-- firstname: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- bonus_percent: double (nullable = false)
 |-- bonus_amount: double (nullable = true)
 |-- name: string (nullable = false)
 |-- current_date: date (nullable = false)



In [57]:

from pyspark.sql.functions import when

dfM = dfM.withColumn("grade", \
   when((df.salary < 4000), lit("A")) \
     .when((df.salary >= 4000) & (df.salary <= 5000), lit("B")) \
     .otherwise(lit("C")) \
  )

dfM.show()

+---------+--------+------+------+-------------+------------+---------------+------------+-----+
|firstname|lastname|gender|salary|bonus_percent|bonus_amount|           name|current_date|grade|
+---------+--------+------+------+-------------+------------+---------------+------------+-----+
|    James|   Smith|     M|  3000|          0.3|       900.0|    James,Smith|  2023-01-19|    A|
|     Anna|    Rose|     F|  4100|          0.3|      1230.0|      Anna,Rose|  2023-01-19|    B|
|   Robert|Williams|     M|  6200|          0.3|      1860.0|Robert,Williams|  2023-01-19|    C|
+---------+--------+------+------+-------------+------------+---------------+------------+-----+



Drop columns

In [60]:
dfM = dfM.drop('firstname')
dfM = dfM.drop('lastname')

In [61]:
dfM.show()

+------+------+-------------+------------+---------------+------------+-----+
|gender|salary|bonus_percent|bonus_amount|           name|current_date|grade|
+------+------+-------------+------------+---------------+------------+-----+
|     M|  3000|          0.3|       900.0|    James,Smith|  2023-01-19|    A|
|     F|  4100|          0.3|      1230.0|      Anna,Rose|  2023-01-19|    B|
|     M|  6200|          0.3|      1860.0|Robert,Williams|  2023-01-19|    C|
+------+------+-------------+------------+---------------+------------+-----+



Aggregate functions 

In [63]:

#import pyspark
#from pyspark.sql import SparkSession

from pyspark.sql.functions import approx_count_distinct,collect_list
from pyspark.sql.functions import collect_set,sum,avg,max,countDistinct,count
from pyspark.sql.functions import first, last, kurtosis, min, mean, skewness 
from pyspark.sql.functions import stddev, stddev_samp, stddev_pop, sumDistinct
from pyspark.sql.functions import variance,var_samp,  var_pop

#spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

simpleData = [("James", "Sales", 3000),
    ("Michael", "Sales", 4600),
    ("Robert", "Sales", 4100),
    ("Maria", "Finance", 3000),
    ("James", "Sales", 3000),
    ("Scott", "Finance", 3300),
    ("Jen", "Finance", 3900),
    ("Jeff", "Marketing", 3000),
    ("Kumar", "Marketing", 2000),
    ("Saif", "Sales", 4100)
  ]
schema = ["employee_name", "department", "salary"]
  
  
df = spark.createDataFrame(data=simpleData, schema = schema)
df.printSchema()
#df.show(truncate=False)
df.show()


root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|        James|     Sales|  3000|
|      Michael|     Sales|  4600|
|       Robert|     Sales|  4100|
|        Maria|   Finance|  3000|
|        James|     Sales|  3000|
|        Scott|   Finance|  3300|
|          Jen|   Finance|  3900|
|         Jeff| Marketing|  3000|
|        Kumar| Marketing|  2000|
|         Saif|     Sales|  4100|
+-------------+----------+------+



In [75]:

print("approx_count_distinct: " + \
      str(df.select(approx_count_distinct("salary")).collect()[0][0]))

print("avg: " + str(df.select(avg("salary")).collect()[0][0]))

df.select(collect_list("salary")).show(truncate=False) # all

df.select(collect_set("salary")).show(truncate=False) # distint



approx_count_distinct: 6
avg: 3400.0
+------------------------------------------------------------+
|collect_list(salary)                                        |
+------------------------------------------------------------+
|[3000, 4600, 4100, 3000, 3000, 3300, 3900, 3000, 2000, 4100]|
+------------------------------------------------------------+

+------------------------------------+
|collect_set(salary)                 |
+------------------------------------+
|[4600, 3000, 3900, 4100, 3300, 2000]|
+------------------------------------+



In [68]:


df2 = df.select(countDistinct("department", "salary"))
df2.show(truncate=False)

print("Distinct Count of Department &amp; Salary: "+str(df2.collect()))

print("count: "+str(df.select(count("salary")).collect()[0][0]))


+----------------------------------+
|count(DISTINCT department, salary)|
+----------------------------------+
|8                                 |
+----------------------------------+

Distinct Count of Department &amp; Salary: [Row(count(DISTINCT department, salary)=8)]
count: 10


In [69]:
df.select(first("salary")).show(truncate=False)
df.select(last("salary")).show(truncate=False)
df.select(kurtosis("salary")).show(truncate=False)
df.select(max("salary")).show(truncate=False)
df.select(min("salary")).show(truncate=False)
df.select(mean("salary")).show(truncate=False)
df.select(skewness("salary")).show(truncate=False)
df.select(stddev("salary"), stddev_samp("salary"), \
    stddev_pop("salary")).show(truncate=False)
df.select(sum("salary")).show(truncate=False)
df.select(sumDistinct("salary")).show(truncate=False)
df.select(variance("salary"),var_samp("salary"),var_pop("salary")) \
  .show(truncate=False)

+-------------+
|first(salary)|
+-------------+
|3000         |
+-------------+

+------------+
|last(salary)|
+------------+
|4100        |
+------------+

+-------------------+
|kurtosis(salary)   |
+-------------------+
|-0.6467803030303032|
+-------------------+

+-----------+
|max(salary)|
+-----------+
|4600       |
+-----------+

+-----------+
|min(salary)|
+-----------+
|2000       |
+-----------+

+-----------+
|avg(salary)|
+-----------+
|3400.0     |
+-----------+

+--------------------+
|skewness(salary)    |
+--------------------+
|-0.12041791181069571|
+--------------------+

+-------------------+-------------------+------------------+
|stddev_samp(salary)|stddev_samp(salary)|stddev_pop(salary)|
+-------------------+-------------------+------------------+
|765.9416862050705  |765.9416862050705  |726.636084983398  |
+-------------------+-------------------+------------------+

+-----------+
|sum(salary)|
+-----------+
|34000      |
+-----------+





+--------------------+
|sum(DISTINCT salary)|
+--------------------+
|20900               |
+--------------------+

+-----------------+-----------------+---------------+
|var_samp(salary) |var_samp(salary) |var_pop(salary)|
+-----------------+-----------------+---------------+
|586666.6666666666|586666.6666666666|528000.0       |
+-----------------+-----------------+---------------+



Order by - Group by

In [77]:
# -*- coding: utf-8 -*-
"""
author SparkByExamples.com
"""

from pyspark.sql import SparkSession
from pyspark.sql.functions import col,sum,avg,max

spark = SparkSession.builder \
                    .appName('SparkByExamples.com') \
                    .getOrCreate()

simpleData = [("James","Sales","NY",90000,34,10000),
    ("Michael","Sales","NV",86000,56,20000),
    ("Robert","Sales","CA",81000,30,23000),
    ("Maria","Finance","CA",90000,24,23000),
    ("Raman","Finance","DE",99000,40,24000),
    ("Scott","Finance","NY",83000,36,19000),
    ("Jen","Finance","NY",79000,53,15000),
    ("Jeff","Marketing","NV",80000,25,18000),
    ("Kumar","Marketing","NJ",91000,50,21000)
  ]

schema = ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data=simpleData, schema = schema)
df.printSchema()
df.show(truncate=False)


root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NV   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Raman        |Finance   |DE   |99000 |40 |24000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |NV   |80000 |25 |18000|
|Kumar        |Marketing |NJ   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+



In [80]:
dfSort=df.sort(df.state,df.salary, ascending=False)

dfSort.show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|      Michael|     Sales|   NV| 86000| 56|20000|
|         Jeff| Marketing|   NV| 80000| 25|18000|
|        Kumar| Marketing|   NJ| 91000| 50|21000|
|        Raman|   Finance|   DE| 99000| 40|24000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|       Robert|     Sales|   CA| 81000| 30|23000|
+-------------+----------+-----+------+---+-----+



In [82]:
dfSort=dfSort.groupBy(df.state).agg(sum(df.salary))
dfSort.show()

+-----+-----------+
|state|sum(salary)|
+-----+-----------+
|   NY|     252000|
|   NV|     166000|
|   CA|     171000|
|   DE|      99000|
|   NJ|      91000|
+-----+-----------+

