In [0]:
spark

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()


In [0]:
columns = ["language","users_count"]
data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]

In [0]:
rdd = spark.sparkContext.parallelize(data)
rdd.getNumPartitions()

Out[46]: 8

In [0]:
dfFromRDD1 = rdd.toDF()
dfFromRDD1.printSchema()


root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)



In [0]:
dfFromRDD1 = rdd.toDF(columns)
dfFromRDD1.printSchema()

root
 |-- language: string (nullable = true)
 |-- users_count: string (nullable = true)



In [0]:
df2 = spark.createDataFrame(rdd).toDF(*columns)
df2.show()

+--------+-----------+
|language|users_count|
+--------+-----------+
|    Java|      20000|
|  Python|     100000|
|   Scala|       3000|
+--------+-----------+



In [0]:
df3 = spark.createDataFrame(data).toDF(*columns)
df3.show()

+--------+-----------+
|language|users_count|
+--------+-----------+
|    Java|      20000|
|  Python|     100000|
|   Scala|       3000|
+--------+-----------+



In [0]:
from pyspark.sql import Row
rowData = map(lambda x: Row(*x) ,data)
rowData

Out[51]: <map at 0x7f109834ddf0>

In [0]:
dfFromData3 = spark.createDataFrame(rowData,columns)
dfFromData3.show()

+--------+-----------+
|language|users_count|
+--------+-----------+
|    Java|      20000|
|  Python|     100000|
|   Scala|       3000|
+--------+-----------+



In [0]:
from pyspark.sql.types import StructType,StructField,StringType,IntegerType
data2 = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]
schema = StructType([
  StructField("firstname",StringType(),True), \
  StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
])

In [0]:
df =spark.createDataFrame(data=data2,schema=schema)
df.printSchema()
df.show()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|   id|gender|salary|
+---------+----------+--------+-----+------+------+
|    James|          |   Smith|36636|     M|  3000|
|  Michael|      Rose|        |40288|     M|  4000|
|   Robert|          |Williams|42114|     M|  4000|
|    Maria|      Anne|   Jones|39192|     F|  4000|
|      Jen|      Mary|   Brown|     |     F|    -1|
+---------+----------+--------+-----+------+------+



In [0]:
emptyRDD = spark.sparkContext.emptyRDD()


In [0]:
df1 = emptyRDD.toDF(schema)
df1.show()

+---------+----------+--------+---+------+------+
|firstname|middlename|lastname| id|gender|salary|
+---------+----------+--------+---+------+------+
+---------+----------+--------+---+------+------+



In [0]:
df2 = spark.createDataFrame([], schema)
df2.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)



In [0]:
dept = [("Finance",10),("Marketing",20),("Sales",30),("IT",40)]
rdd = spark.sparkContext.parallelize(dept)

deptSchema = StructType([       
    StructField('dept_name', StringType(), True),
    StructField('dept_id', StringType(), True)
])

deptDF1 = spark.createDataFrame(rdd, schema = deptSchema)
deptDF1.printSchema()
deptDF1.show(truncate=False)

root
 |-- dept_name: string (nullable = true)
 |-- dept_id: string (nullable = true)

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+



In [0]:
pandasDF = deptDF1.toPandas()
pandasDF.head()

Unnamed: 0,dept_name,dept_id
0,Finance,10
1,Marketing,20
2,Sales,30
3,IT,40


In [0]:
structureData = [
    (("James","","Smith"),"36636","M",3100),
    (("Michael","Rose",""),"40288","M",4300),
    (("Robert","","Williams"),"42114","M",1400),
    (("Maria","Anne","Jones"),"39192","F",5500),
    (("Jen","Mary","Brown"),"","F",-1)
  ]
structureSchema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('id', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])

df2 = spark.createDataFrame(data=structureData,schema=structureSchema)
df2.printSchema()
df2.show(truncate=False)

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+--------------------+-----+------+------+
|name                |id   |gender|salary|
+--------------------+-----+------+------+
|{James, , Smith}    |36636|M     |3100  |
|{Michael, Rose, }   |40288|M     |4300  |
|{Robert, , Williams}|42114|M     |1400  |
|{Maria, Anne, Jones}|39192|F     |5500  |
|{Jen, Mary, Brown}  |     |F     |-1    |
+--------------------+-----+------+------+



In [0]:
from pyspark.sql.functions import col,struct,when
updatedDF = df2.withColumn("OtherInfo",struct(col("id").alias("identfier"),
                                             col("gender").alias("gender"),
                                              col("salary").alias("salary"),
                                             ))
updatedDF.show()

+--------------------+-----+------+------+----------------+
|                name|   id|gender|salary|       OtherInfo|
+--------------------+-----+------+------+----------------+
|    {James, , Smith}|36636|     M|  3100|{36636, M, 3100}|
|   {Michael, Rose, }|40288|     M|  4300|{40288, M, 4300}|
|{Robert, , Williams}|42114|     M|  1400|{42114, M, 1400}|
|{Maria, Anne, Jones}|39192|     F|  5500|{39192, F, 5500}|
|  {Jen, Mary, Brown}|     |     F|    -1|       {, F, -1}|
+--------------------+-----+------+------+----------------+



In [0]:
updatedDF = df2.withColumn("OtherInfo", 
    struct(col("id").alias("identifier"),
    col("gender").alias("gender"),
    col("salary").alias("salary"),
    when(col("salary").cast(IntegerType()) < 2000,"Low")
      .when(col("salary").cast(IntegerType()) < 4000,"Medium")
      .otherwise("High").alias("Salary_Grade")
  )).drop("id","gender","salary")

updatedDF.printSchema()
updatedDF.show(truncate=False)

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- OtherInfo: struct (nullable = false)
 |    |-- identifier: string (nullable = true)
 |    |-- gender: string (nullable = true)
 |    |-- salary: integer (nullable = true)
 |    |-- Salary_Grade: string (nullable = false)

+--------------------+------------------------+
|name                |OtherInfo               |
+--------------------+------------------------+
|{James, , Smith}    |{36636, M, 3100, Medium}|
|{Michael, Rose, }   |{40288, M, 4300, High}  |
|{Robert, , Williams}|{42114, M, 1400, Low}   |
|{Maria, Anne, Jones}|{39192, F, 5500, High}  |
|{Jen, Mary, Brown}  |{, F, -1, Low}          |
+--------------------+------------------------+



In [0]:
updatedDF = df2.withColumn("SalaryBand", 
    when(col("salary").cast(IntegerType()) < 2000,"Low")
      .when(col("salary").cast(IntegerType()) < 4000,"Medium")
      .otherwise("High").alias("Salary_Grade")
  ) 

updatedDF.printSchema()
updatedDF.show(truncate=False)

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- SalaryBand: string (nullable = false)

+--------------------+-----+------+------+----------+
|name                |id   |gender|salary|SalaryBand|
+--------------------+-----+------+------+----------+
|{James, , Smith}    |36636|M     |3100  |Medium    |
|{Michael, Rose, }   |40288|M     |4300  |High      |
|{Robert, , Williams}|42114|M     |1400  |Low       |
|{Maria, Anne, Jones}|39192|F     |5500  |High      |
|{Jen, Mary, Brown}  |     |F     |-1    |Low       |
+--------------------+-----+------+------+----------+



In [0]:
updatedDF.select("*").show()


+--------------------+-----+------+------+----------+
|                name|   id|gender|salary|SalaryBand|
+--------------------+-----+------+------+----------+
|    {James, , Smith}|36636|     M|  3100|    Medium|
|   {Michael, Rose, }|40288|     M|  4300|      High|
|{Robert, , Williams}|42114|     M|  1400|       Low|
|{Maria, Anne, Jones}|39192|     F|  5500|      High|
|  {Jen, Mary, Brown}|     |     F|    -1|       Low|
+--------------------+-----+------+------+----------+



In [0]:
updatedDF.select(col("id"),col("name")).show()


+-----+--------------------+
|   id|                name|
+-----+--------------------+
|36636|    {James, , Smith}|
|40288|   {Michael, Rose, }|
|42114|{Robert, , Williams}|
|39192|{Maria, Anne, Jones}|
|     |  {Jen, Mary, Brown}|
+-----+--------------------+



In [0]:
updatedDF.collect()

Out[66]: [Row(name=Row(firstname='James', middlename='', lastname='Smith'), id='36636', gender='M', salary=3100, SalaryBand='Medium'),
 Row(name=Row(firstname='Michael', middlename='Rose', lastname=''), id='40288', gender='M', salary=4300, SalaryBand='High'),
 Row(name=Row(firstname='Robert', middlename='', lastname='Williams'), id='42114', gender='M', salary=1400, SalaryBand='Low'),
 Row(name=Row(firstname='Maria', middlename='Anne', lastname='Jones'), id='39192', gender='F', salary=5500, SalaryBand='High'),
 Row(name=Row(firstname='Jen', middlename='Mary', lastname='Brown'), id='', gender='F', salary=-1, SalaryBand='Low')]

In [0]:
for row in updatedDF.collect():
    print(row['id'] + "," +str(row['salary']))

36636,3100
40288,4300
42114,1400
39192,5500
,-1


In [0]:
from pyspark.sql.functions import col, lit

df.withColumn("Country", lit("USA")).show()

+---------+----------+--------+-----+------+------+-------+
|firstname|middlename|lastname|   id|gender|salary|Country|
+---------+----------+--------+-----+------+------+-------+
|    James|          |   Smith|36636|     M|  3000|    USA|
|  Michael|      Rose|        |40288|     M|  4000|    USA|
|   Robert|          |Williams|42114|     M|  4000|    USA|
|    Maria|      Anne|   Jones|39192|     F|  4000|    USA|
|      Jen|      Mary|   Brown|     |     F|    -1|    USA|
+---------+----------+--------+-----+------+------+-------+



In [0]:
df.filter(df.gender != "M").show()

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|   id|gender|salary|
+---------+----------+--------+-----+------+------+
|    Maria|      Anne|   Jones|39192|     F|  4000|
|      Jen|      Mary|   Brown|     |     F|    -1|
+---------+----------+--------+-----+------+------+



In [0]:
df.filter(df.firstname .contains("M")).show()

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|   id|gender|salary|
+---------+----------+--------+-----+------+------+
|  Michael|      Rose|        |40288|     M|  4000|
|    Maria|      Anne|   Jones|39192|     F|  4000|
+---------+----------+--------+-----+------+------+



In [0]:
simpleData = [("James","Sales","NY",90000,34,10000), \
    ("Michael","Sales","NY",86000,56,20000), \
    ("Robert","Sales","CA",81000,30,23000), \
    ("Maria","Finance","CA",90000,24,23000), \
    ("Raman","Finance","CA",99000,40,24000), \
    ("Scott","Finance","NY",83000,36,19000), \
    ("Jen","Finance","NY",79000,53,15000), \
    ("Jeff","Marketing","CA",80000,25,18000), \
    ("Kumar","Marketing","NY",91000,50,21000) \
  ]
columns= ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Raman        |Finance   |CA   |99000 |40 |24000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+



In [0]:
df.sort(col("department"),col("state")).show(truncate=False)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|Raman        |Finance   |CA   |99000 |40 |24000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|James        |Sales     |NY   |90000 |34 |10000|
+-------------+----------+-----+------+---+-----+



In [0]:
df.orderBy(col("department"),col("state")).show(truncate=False)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|Maria        |Finance   |CA   |90000 |24 |23000|
|Raman        |Finance   |CA   |99000 |40 |24000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|James        |Sales     |NY   |90000 |34 |10000|
+-------------+----------+-----+------+---+-----+



In [0]:
df.createOrReplaceTempView("EMP")
spark.sql("select employee_name,department,state,salary,age,bonus from EMP ORDER BY department asc").show(truncate=False)


+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|Scott        |Finance   |NY   |83000 |36 |19000|
|Raman        |Finance   |CA   |99000 |40 |24000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
+-------------+----------+-----+------+---+-----+



In [0]:
df.printSchema()

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)



In [0]:
df.groupBy(col("department")).sum("salary").show(truncate=False)

+----------+-----------+
|department|sum(salary)|
+----------+-----------+
|Sales     |257000     |
|Finance   |351000     |
|Marketing |171000     |
+----------+-----------+



In [0]:
df.groupBy(col("department")).count().show()

+----------+-----+
|department|count|
+----------+-----+
|     Sales|    3|
|   Finance|    4|
| Marketing|    2|
+----------+-----+



In [0]:
minsal=df.groupBy(col("department")).min("salary")
minsal.show()

+----------+-----------+
|department|min(salary)|
+----------+-----------+
|     Sales|      81000|
|   Finance|      79000|
| Marketing|      80000|
+----------+-----------+



In [0]:
df.groupBy(col("department"),col("state")).count().show()

+----------+-----+-----+
|department|state|count|
+----------+-----+-----+
|     Sales|   NY|    2|
|     Sales|   CA|    1|
|   Finance|   CA|    2|
|   Finance|   NY|    2|
| Marketing|   NY|    1|
| Marketing|   CA|    1|
+----------+-----+-----+



In [0]:
from pyspark.sql.functions import sum,avg,max,min,mean,count
df.groupBy("department"). \
    agg(sum("salary").alias("sum_salary"), \
         avg("salary").alias("avg_salary"), \
         sum("bonus").alias("sum_bonus"), \
         max("bonus").alias("max_bonus") \
     ) \
    .show(truncate=False)

+----------+----------+-----------------+---------+---------+
|department|sum_salary|avg_salary       |sum_bonus|max_bonus|
+----------+----------+-----------------+---------+---------+
|Sales     |257000    |85666.66666666667|53000    |23000    |
|Finance   |351000    |87750.0          |81000    |24000    |
|Marketing |171000    |85500.0          |39000    |21000    |
+----------+----------+-----------------+---------+---------+



In [0]:
df.where(col("salary")>80000).show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+



In [0]:
emp = [(1,"Smith",-1,"2018","10","M",3000), \
    (2,"Rose",1,"2010","20","M",4000), \
    (3,"Williams",1,"2010","10","M",1000), \
    (4,"Jones",2,"2005","10","F",2000), \
    (5,"Brown",2,"2010","40","",-1), \
      (6,"Brown",2,"2010","50","",-1) \
  ]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
       "emp_dept_id","gender","salary"]

empDF = spark.createDataFrame(data=emp, schema = empColumns)
empDF.printSchema()
empDF.show(truncate=False)

dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40) \
  ]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.printSchema()
deptDF.show(truncate=False)

root
 |-- emp_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- superior_emp_id: long (nullable = true)
 |-- year_joined: string (nullable = true)
 |-- emp_dept_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
|6     |Brown   |2              |2010       |50         |      |-1    |
+------+--------+---------------+-----------+-----------+------+-----

In [0]:
empDF.as('emp').join(deptDF.as('dept'),col('emp.emp_dept_id')')

[0;36m  File [0;32m"<command-4500426969688471>"[0;36m, line [0;32m1[0m
[0;31m    empDF.as('emp').join(deptDF.as('dept'),col('emp.emp_dept_id')')[0m
[0m          ^[0m
[0;31mSyntaxError[0m[0;31m:[0m invalid syntax


In [0]:
# returns columns from the only left dataset for the records match in the right dataset on join expression, records not matched on 
# join expression are ignored from  both left and right datasets
empDF.alias('emp').join(deptDF.alias('dept'),col('emp.emp_dept_id')==col('dept.dept_id'),"leftsemi" ).show()



In [0]:
# leftanti join does the exact opposite of the leftsemi, leftanti join returns only columns from the left dataset for non-matched records
empDF.alias('emp').join(deptDF.alias('dept'),col('emp.emp_dept_id')==col('dept.dept_id'),"leftanti" ).show()



In [0]:
simpleData = [("James","Sales","NY",90000,34,10000), \
    ("Michael","Sales","NY",86000,56,20000), \
    ("Robert","Sales","CA",81000,30,23000), \
    ("Maria","Finance","CA",90000,24,23000) \
  ]

columns= ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate=False)

simpleData2 = [("James","Sales","NY",90000,34,10000), \
    ("Maria","Finance","CA",90000,24,23000), \
    ("Jen","Finance","NY",79000,53,15000), \
    ("Jeff","Marketing","CA",80000,25,18000), \
    ("Kumar","Marketing","NY",91000,50,21000) \
  ]
columns2= ["employee_name","department","state","salary","age","bonus"]

df2 = spark.createDataFrame(data = simpleData2, schema = columns2)

df2.printSchema()
df2.show(truncate=False)

unionDF = df.union(df2)
unionDF.show(truncate=False)
disDF = df.union(df2).distinct()
disDF.show(truncate=False)

unionAllDF = df.unionAll(df2)
unionAllDF.show(truncate=False)



In [0]:
columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]

df = spark.createDataFrame(data=data,schema=columns)

df.show(truncate=False)



In [0]:
def convertCase(str):
  return str.upper()



In [0]:
convertCase("john jones")



In [0]:
""" Converting function to UDF """
convertUDF = udf(lambda z: convertCase(z))



In [0]:
df.select(col("Seqno"),convertUDF("Name").alias("NewName")).show(truncate=False)



In [0]:
df.withColumn("Cureated Name", convertUDF(col("Name"))) \
  .show(truncate=False)



In [0]:
spark.udf.register("convertUDF", convertCase,StringType())
df.createOrReplaceTempView("NAME_TABLE")
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE") \
     .show(truncate=False)



In [0]:
@udf(returnType=StringType()) 
def upperCase(str):
    return str.upper()

df.withColumn("Cureated Name", upperCase(col("Name"))) \
.show(truncate=False)



In [0]:
data = ["Project Gutenberg’s",
        "Alice’s Adventures in Wonderland",
        "Project Gutenberg’s",
        "Adventures in Wonderland",
        "Project Gutenberg’s"]
rdd=spark.sparkContext.parallelize(data)
 
    
#Flatmap    
rdd.flatMap(lambda x: x.split(" ")).collect()



In [0]:
rdd.map(lambda x: x.split(" ")).collect()



In [0]:
data = [('James','Smith','M',30),
  ('Anna','Rose','F',41),
  ('Robert','Williams','M',62), 
]

columns = ["firstname","lastname","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df.show()

rdd2=df.rdd.map(lambda x: 
    (x[0]+","+x[1],x[2],x[3]*2)
    )  
df2=rdd2.toDF(["name","gender","new_salary"]   )
df2.show()

#Referring Column Names
rdd2=df.rdd.map(lambda x: 
    (x["firstname"]+","+x["lastname"],x["gender"],x["salary"]*2)
    ) 

#Referring Column Names
rdd2=df.rdd.map(lambda x: 
    (x.firstname+","+x.lastname,x.gender,x.salary*2)
    ) 

rdd2.collect()
def func1(x):
    firstName=x.firstname
    lastName=x.lastname
    name=firstName+","+lastName
    gender=x.gender.lower()
    salary=x.salary*2
    return (name,gender,salary)

rdd3=df.rdd.map(lambda x: func1(x))
rdd3.collect()



In [0]:
rdd2.toDF(["name","gender","new_salary"] ).show()



In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number



In [0]:
simpleData = (("James", "Sales", 3000), \
    ("Michael", "Sales", 4600),  \
    ("Robert", "Sales", 4100),   \
    ("Maria", "Finance", 3000),  \
    ("James", "Sales", 3000),    \
    ("Scott", "Finance", 3300),  \
    ("Jen", "Finance", 3900),    \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", "Marketing", 2000),\
    ("Saif", "Sales", 4100) \
  )
 
columns= ["employee_name", "department", "salary"]
empdf = spark.createDataFrame(data = simpleData, schema = columns)
empdf.printSchema()
empdf.show(truncate=False)



In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number




In [0]:
windowspec = Window.partitionBy("department").orderBy("salary")

empdf.withColumn("row_nr",row_number().over(windowspec)).show(truncate=False)



In [0]:
filtered_df=empdf.withColumn("row_nr",row_number().over(windowspec))
filtered_df.where(col("row_nr")==1).show(truncate=False)



In [0]:
empdf.withColumn("row_nr",row_number().over(windowspec)) \
.select(last(col("salary"))).show(truncate=False)



In [0]:
from pyspark.sql.functions import dense_rank
empdf.withColumn("row_nr",row_number().over(windowspec)) \
.withColumn("rank",rank().over(windowspec))  \
.withColumn("dense_rank",dense_rank().over(windowspec)).show()



In [0]:
from pyspark.sql.functions import cume_dist ,lag ,lead  
empdf.withColumn("cume_dist",cume_dist().over(windowspec)) \
.withColumn("lag",lag("salary",2).over(windowspec)) \
.withColumn("lead",lead("salary",2).over(windowspec)) \
.show()



In [0]:
windowSpecAgg  = Window.partitionBy("department")
from pyspark.sql.functions import col,avg,sum,min,max,row_number 
empdf.withColumn("row",row_number().over(windowspec)) \
  .withColumn("avg", avg(col("salary")).over(windowSpecAgg)) \
  .withColumn("sum", sum(col("salary")).over(windowSpecAgg)) \
  .withColumn("min", min(col("salary")).over(windowSpecAgg)) \
  .withColumn("max", max(col("salary")).over(windowSpecAgg)) \
  .where(col("row")==1).select("department","avg","sum","min","max") \
  .show()



In [0]:
empdf.withColumn("row",row_number().over(windowspec)) \
  .withColumn("last_sal", last(col("salary")).over(windowSpecAgg)) \
.show()

tempdf = empdf.withColumn("row",row_number().over(windowspec)) \
  .withColumn("last_sal", last(col("salary")).over(windowSpecAgg)) 

tempdf.withColumn("salary_diff", col('last_sal')-col('salary')).show()



In [0]:
from pyspark.sql.functions import countDistinct 

empdf.select(countDistinct("department", "salary")).show()



In [0]:
from pyspark.sql.functions import *



In [0]:
empdf.select(max("salary")).show(truncate=False)




In [0]:
sc.parallelize([1,25,8,4,2],20).fold(0,lambda a,b:a+1 )



In [0]:
sc.parallelize(list([1,2,3,4,5]),2).fold(10, lambda x,y:x+y)



In [0]:
'''
in above case for sake of simplicity what is happening here is you are having zeroth element as 10. So the sum that you would otherwise get of all numbers in the RDD, is now added by 10(i.e. zeroth element+all other elements => 10+1+2+3+4+5 = 25). Also now we have two partitions(i.e. number of partitions*zeroth element => 2*10 = 20) Final output that fold emits is 25+20 = 45
'''



In [0]:
data = [('Project', 1),
('Gutenberg’s', 1),
('Alice’s', 1),
('Adventures', 1),
('in', 1),
('Wonderland', 1),
('Project', 1),
('Gutenberg’s', 1),
('Adventures', 1),
('in', 1),
('Wonderland', 1),
('Project', 1),
('Gutenberg’s', 1)]

rdd=spark.sparkContext.parallelize(data)

rdd2=rdd.reduceByKey(lambda a,b: a+b)
rdd2.collect()



In [0]:
source_df = spark.createDataFrame(
    [
        ("Jose", "BLUE"),
        ("lI", "BrOwN")
    ],
    ["name", "eye_color"]
)
source_df.show()



In [0]:
from functools import reduce
# The reduce(fun,seq) function is used to apply a particular function passed in its argument to all of the list elements mentioned in the sequence passed along
actual_df = reduce(
    lambda memo_df, col_name: memo_df.withColumn(col_name, lower(col(col_name))),
    source_df.columns,
    source_df
)

print(actual_df.show())



In [0]:
# dynamically add columns 
import pyspark.sql.functions as F
new_df=source_df
for x in ['start_time','end_time']:
  new_df = new_df.withColumn( str(x),F.current_date())
new_df.show()



In [0]:
linesrdd = spark.sparkContext.textFile("dbfs:/FileStore/shared_uploads/priya1cvr.pb@gmail.com/urls.txt")
linesrdd.collect()

Out[85]: ['https://gist.github.com/marclamberti/f45f872dea4dfd3eaa015a4a1af4b39b',
 'https://security.google.com/settings/security/apppasswords']

In [0]:
linesrdd = spark.sparkContext.textFile("dbfs:/FileStore/shared_uploads/priya1cvr.pb@gmail.com/urls.txt")
print(linesrdd.collect())
linelength = linesrdd.map(lambda s : len(s))
print(linelength.collect())
totalLength=linelength.reduce(lambda a,b: a+b)
totalLength

['https://gist.github.com/marclamberti/f45f872dea4dfd3eaa015a4a1af4b39b', 'https://security.google.com/settings/security/apppasswords']
[69, 58]
Out[87]: 127

In [0]:

linesrdd = spark.sparkContext.textFile("dbfs:/FileStore/tables/shared_uploads/priya1cvr.pb@gmail.com/cli_commands-1.txt")
print(linesrdd.take(5))
linelength = linesrdd.map(lambda s : len(s))
print(linelength.collect())
totalLength=linelength.reduce(lambda a,b: a+b)
totalLength

['CLI Commands', '', '', 'Installing Airflow', 'docker run -it --rm -p 8080:8080 python:3.8-slim /bin/bash']
[12, 0, 0, 18, 58, 142, 0, 0, 9, 26, 0, 0, 38, 123, 0, 0, 18, 63, 0, 0, 264, 68, 0, 0, 48, 94, 0, 0, 30, 75, 0, 0, 25, 74, 0, 0, 12, 18, 0, 0, 23, 38, 0, 0, 28, 42, 0, 0, 91, 87, 0, 0, 117, 0, 0, 248, 0, 0, 15, 29, 0, 0, 19, 41, 0, 0, 25, 19, 0, 88, 0, 0, 19, 41, 0, 0, 31, 128, 0, 45, 0, 25, 0, 0, 9, 32, 0, 0, 38, 74, 0, 0, 3, 38, 0, 0, 15, 29, 0, 0, 16, 49, 0, 0, 18, 56, 0, 0, 17, 27, 0, 0, 17, 27, 0, 0, 21, 91, 0, 0, 17, 85, 0, 0, 2, 53, 0, 0, 44, 81, 0, 0, 58, 187, 0, 0, 75, 214, 0, 0, 49, 59, 0, 0, 42, 63, 0, 0, 71, 163]
Out[90]: 4254

In [0]:
linesrdd = spark.sparkContext.textFile("dbfs:/FileStore/tables/shared_uploads/priya1cvr.pb@gmail.com/cli_commands-1.txt")
splitwords = linesrdd.flatMap(lambda x : x.split(" "))
results = splitwords.map(lambda x: (x,1))
totalcount = results.reduceByKey(lambda a,b : a+b)
totalcount.collect()

Out[93]: [('Commands', 1),
 ('', 83),
 ('Airflow', 6),
 ('docker', 8),
 ('run', 2),
 ('/bin/bash', 5),
 ('*', 35),
 ('Create', 3),
 ('start', 1),
 ('container', 1),
 ('image', 3),
 ('execute', 1),
 ('command', 3),
 ('in', 12),
 ('have', 1),
 ('shell', 2),
 ('python', 3),
 ('Print', 2),
 ('Python', 1),
 ('version', 3),
 ('export', 1),
 ('Export', 1),
 ('variable', 2),
 ('used', 1),
 ('store', 1),
 ('dags', 8),
 ('configuration', 1),
 ('|', 2),
 ('airflow', 24),
 ('check', 2),
 ('exported', 1),
 ('update', 1),
 ('-y', 2),
 ('install', 4),
 ('wget', 2),
 ('libczmq-dev', 1),
 ('libssl-dev', 1),
 ('git', 1),
 ('bind9utils', 1),
 ('freetds-dev', 1),
 ('libkrb5-dev', 1),
 ('libffi-dev', 1),
 ('libpq-dev', 1),
 ('apt-utils', 1),
 ('zip', 1),
 ('unzip', 1),
 ('gcc', 1),
 ('Install', 2),
 ('tools', 1),
 ('${AIRFLOW_HOME}', 1),
 ('airflow,', 1),
 ('set', 2),
 ('home', 1),
 ('directory', 3),
 ('value', 1),
 ('of', 9),
 ('into', 3),
 ('cat', 1),
 ('/etc/passwd', 2),
 ('created', 1),
 ('pip', 4),
 (

In [0]:
data = sc.parallelize([("a",1),("b",2),("a",2),("b",3)],4)
data.aggregateByKey(1, lambda acc,val: acc+val,
                   lambda acc1,acc2: acc1+acc2).collect()

Out[109]: [('b', 7), ('a', 5)]

In [0]:
data.glom().collect()

Out[108]: [[('a', 1)], [('b', 2)], [('a', 2)], [('b', 3)]]

In [0]:
data.reduceByKey(lambda a,b: a+b).collect()

Out[110]: [('b', 5), ('a', 3)]

In [0]:
data.reduceByKey(lambda a,b: a+b,10).collect()

Out[111]: [('b', 5), ('a', 3)]