In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [12]:
rdd1 = spark.sparkContext.textFile("file:///home/hadoop/test.txt")
rdd1_2 = rdd1.flatMap(lambda x: x.split(" "))
rdd1_3 = rdd1_2.map(lambda x : (x, 1))
rdd1_4 = rdd1_3.reduceByKey(lambda a, b : a+b)

In [13]:
rdd1_5 = rdd1_4.map(lambda x : (x[1], x[0])).sortByKey()

                                                                                

In [14]:
print(rdd1_5.collect())

[(9, 'Project'), (9, 'Gutenberg’s'), (18, 'Alice’s'), (18, 'in'), (18, 'Lewis'), (18, 'Carroll'), (18, 'Adventures'), (18, 'Wonderland'), (18, 'by'), (27, 'is'), (27, 'use'), (27, 'of'), (27, 'anyone'), (27, 'anywhere'), (27, 'at'), (27, 'no'), (27, 'This'), (27, 'eBook'), (27, 'for'), (27, 'the'), (27, 'cost'), (27, 'and'), (27, 'with')]


In [15]:
columns = ["Seqno","Quote"]
data = [("1", "Be the change that you wish to see in the world"),
    ("2", "Everyone thinks of changing the world, but no one thinks of changing himself."),
    ("3", "The purpose of our lives is to be happy."),
    ("4", "Be cool.")]
df = spark.createDataFrame(data,columns)
df.show()


+-----+--------------------+
|Seqno|               Quote|
+-----+--------------------+
|    1|Be the change tha...|
|    2|Everyone thinks o...|
|    3|The purpose of ou...|
|    4|            Be cool.|
+-----+--------------------+



In [16]:
df.show(truncate=False)

+-----+-----------------------------------------------------------------------------+
|Seqno|Quote                                                                        |
+-----+-----------------------------------------------------------------------------+
|1    |Be the change that you wish to see in the world                              |
|2    |Everyone thinks of changing the world, but no one thinks of changing himself.|
|3    |The purpose of our lives is to be happy.                                     |
|4    |Be cool.                                                                     |
+-----+-----------------------------------------------------------------------------+



In [17]:
dept = [("Finance",10),("Marketing",20),("Sales",30),("IT",40)]
rdd = spark.sparkContext.parallelize(dept)

In [18]:
df = rdd.toDF()

In [19]:
df.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: long (nullable = true)



In [20]:
df.show()

+---------+---+
|       _1| _2|
+---------+---+
|  Finance| 10|
|Marketing| 20|
|    Sales| 30|
|       IT| 40|
+---------+---+



In [21]:
deptColumns = ["dept_name","dept_id"]
df2 = rdd.toDF(deptColumns)


In [24]:
df2.show()

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|  Finance|     10|
|Marketing|     20|
|    Sales|     30|
|       IT|     40|
+---------+-------+



In [25]:
data = [("James","Smith","USA","CA"),
    ("Michael","Rose","USA","NY"),
    ("Robert","Williams","USA","CA"),
    ("Maria","Jones","USA","FL")
  ]

# Column names
columns = ["firstname","lastname","country","state"]

# Create DataFrame
df = spark.createDataFrame(data = data, schema = columns)
df.show(truncate=False)



+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|James    |Smith   |USA    |CA   |
|Michael  |Rose    |USA    |NY   |
|Robert   |Williams|USA    |CA   |
|Maria    |Jones   |USA    |FL   |
+---------+--------+-------+-----+



In [26]:
df.select("firstname","lastname").show(3)

+---------+--------+
|firstname|lastname|
+---------+--------+
|    James|   Smith|
|  Michael|    Rose|
|   Robert|Williams|
+---------+--------+
only showing top 3 rows



In [27]:
df.select(df["firstname"], df["lastname"]).show(3)

+---------+--------+
|firstname|lastname|
+---------+--------+
|    James|   Smith|
|  Michael|    Rose|
|   Robert|Williams|
+---------+--------+
only showing top 3 rows



In [28]:
df.select(df.firstname, df.lastname).show(3)

+---------+--------+
|firstname|lastname|
+---------+--------+
|    James|   Smith|
|  Michael|    Rose|
|   Robert|Williams|
+---------+--------+
only showing top 3 rows



In [29]:
df.select(df.columns[:4]).show()

+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|    James|   Smith|    USA|   CA|
|  Michael|    Rose|    USA|   NY|
|   Robert|Williams|    USA|   CA|
|    Maria|   Jones|    USA|   FL|
+---------+--------+-------+-----+



In [31]:
data = [
        (("James",None,"Smith"),"OH","M"),
        (("Anna","Rose",""),"NY","F"),
        (("Julia","","Williams"),"OH","F"),
        (("Maria","Anne","Jones"),"NY","M"),
        (("Jen","Mary","Brown"),"NY","M"),
        (("Mike","Mary","Williams"),"OH","M")
        ]

from pyspark.sql.types import StructType,StructField, StringType        
schema = StructType([
    StructField('name', StructType([
         StructField('firstname', StringType(), True),
         StructField('middlename', StringType(), True),
         StructField('lastname', StringType(), True)
         ])),
     StructField('state', StringType(), True),
     StructField('gender', StringType(), True)
     ])
df2 = spark.createDataFrame(data = data, schema = schema)
df2.show(3)


+--------------------+-----+------+
|                name|state|gender|
+--------------------+-----+------+
|{James, null, Smith}|   OH|     M|
|      {Anna, Rose, }|   NY|     F|
| {Julia, , Williams}|   OH|     F|
+--------------------+-----+------+
only showing top 3 rows



In [32]:
df2.select("name").show(3,truncate=False)

+--------------------+
|name                |
+--------------------+
|{James, null, Smith}|
|{Anna, Rose, }      |
|{Julia, , Williams} |
+--------------------+
only showing top 3 rows



In [33]:
df2.select("name.firstname","name.lastname").show(3,truncate=False)

+---------+--------+
|firstname|lastname|
+---------+--------+
|James    |Smith   |
|Anna     |        |
|Julia    |Williams|
+---------+--------+
only showing top 3 rows



In [34]:
df2.select("name.*", "gender").show(3)

+---------+----------+--------+------+
|firstname|middlename|lastname|gender|
+---------+----------+--------+------+
|    James|      null|   Smith|     M|
|     Anna|      Rose|        |     F|
|    Julia|          |Williams|     F|
+---------+----------+--------+------+
only showing top 3 rows



In [35]:
simpleData = [("James","Sales","NY",90000,34,10000),
    ("Michael","Sales","NY",86000,56,20000),
    ("Robert","Sales","CA",81000,30,23000),
    ("Maria","Finance","CA",90000,24,23000),
    ("Raman","Finance","CA",99000,40,24000),
    ("Scott","Finance","NY",83000,36,19000),
    ("Jen","Finance","NY",79000,53,15000),
    ("Jeff","Marketing","CA",80000,25,18000),
    ("Kumar","Marketing","NY",91000,50,21000)
  ]

# Create DataFrame
schema = ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data=simpleData, schema = schema)
df.printSchema()
df.show(truncate=False)


root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Raman        |Finance   |CA   |99000 |40 |24000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+



In [44]:
from pyspark.sql.functions import sum,avg,max

In [37]:
# del sum

In [38]:
# sum

<function sum(iterable, /, start=0)>

In [45]:
tmp  = df.groupby("department").\
    agg(sum('salary').alias('sum_salary'),\
        avg('salary'))


In [46]:
tmp.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[department#196], functions=[sum(salary#198L), avg(salary#198L)])
   +- Exchange hashpartitioning(department#196, 200), ENSURE_REQUIREMENTS, [plan_id=167]
      +- HashAggregate(keys=[department#196], functions=[partial_sum(salary#198L), partial_avg(salary#198L)])
         +- Project [department#196, salary#198L]
            +- Scan ExistingRDD[employee_name#195,department#196,state#197,salary#198L,age#199L,bonus#200L]




In [47]:
tmp.show(3)

+----------+----------+-----------------+
|department|sum_salary|      avg(salary)|
+----------+----------+-----------------+
|     Sales|    257000|85666.66666666667|
|   Finance|    351000|          87750.0|
| Marketing|    171000|          85500.0|
+----------+----------+-----------------+



In [48]:
tmp  = df.groupby("department").\
    agg(sum('salary').alias('sum_salary'),\
        avg('salary').alias("avg_salary"))


In [49]:
tmp.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[department#196], functions=[sum(salary#198L), avg(salary#198L)])
   +- Exchange hashpartitioning(department#196, 200), ENSURE_REQUIREMENTS, [plan_id=226]
      +- HashAggregate(keys=[department#196], functions=[partial_sum(salary#198L), partial_avg(salary#198L)])
         +- Project [department#196, salary#198L]
            +- Scan ExistingRDD[employee_name#195,department#196,state#197,salary#198L,age#199L,bonus#200L]




In [50]:
tmp.show(3)

+----------+----------+-----------------+
|department|sum_salary|       avg_salary|
+----------+----------+-----------------+
|     Sales|    257000|85666.66666666667|
|   Finance|    351000|          87750.0|
| Marketing|    171000|          85500.0|
+----------+----------+-----------------+

