In [0]:
# Import SparkSession
from pyspark.sql import SparkSession

# Create SparkSession 
spark = SparkSession.builder \
      .master("local[1]") \
      .appName("SparkByExamples.com") \
      .getOrCreate() 

In [0]:
dataList = [("Java", 20000), ("Python", 100000), ("Scala", 3000)]
rdd=spark.sparkContext.parallelize(dataList)

In [0]:
rdd.count()

Out[3]: 3

In [0]:
print(rdd)

ParallelCollectionRDD[0] at readRDDFromInputStream at PythonRDD.scala:435


In [0]:
rdd.collect()

Out[5]: [('Java', 20000), ('Python', 100000), ('Scala', 3000)]

In [0]:
rdd.first()

Out[6]: ('Java', 20000)

In [0]:
rdd.max()

Out[7]: ('Scala', 3000)

Spark Data frame

In [0]:
data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]

columns = ["firstname","middlename","lastname","dob","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)

In [0]:
df.head()

Out[9]: Row(firstname='James', middlename='', lastname='Smith', dob='1991-04-01', gender='M', salary=3000)

In [0]:
df.head(4)

Out[10]: [Row(firstname='James', middlename='', lastname='Smith', dob='1991-04-01', gender='M', salary=3000),
 Row(firstname='Michael', middlename='Rose', lastname='', dob='2000-05-19', gender='M', salary=4000),
 Row(firstname='Robert', middlename='', lastname='Williams', dob='1978-09-05', gender='M', salary=4000),
 Row(firstname='Maria', middlename='Anne', lastname='Jones', dob='1967-12-01', gender='F', salary=4000)]

In [0]:
df.count()

Out[11]: 5

In [0]:
df.show()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+



In [0]:
df.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)



In [0]:
df.select('dob').printSchema()

root
 |-- dob: string (nullable = true)



In [0]:
df.select('dob').show()

+----------+
|       dob|
+----------+
|1991-04-01|
|2000-05-19|
|1978-09-05|
|1967-12-01|
|1980-02-17|
+----------+



In [0]:
newColumns = ["newCol1","newCol2","newCol3"]
df.toDF(*newColumns).printSchema()

[0;31m---------------------------------------------------------------------------[0m
[0;31mIllegalArgumentException[0m                  Traceback (most recent call last)
File [0;32m<command-3005923524009097>:2[0m
[1;32m      1[0m newColumns [38;5;241m=[39m [[38;5;124m"[39m[38;5;124mnewCol1[39m[38;5;124m"[39m,[38;5;124m"[39m[38;5;124mnewCol2[39m[38;5;124m"[39m,[38;5;124m"[39m[38;5;124mnewCol3[39m[38;5;124m"[39m]
[0;32m----> 2[0m df[38;5;241m.[39mtoDF([38;5;241m*[39mnewColumns)[38;5;241m.[39mprintSchema()

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=[39m [43mfunc[49m[43m([49m[38;5;241;43m*[39;49m[43margs[49m[43m,[49m[43m [49m[38;5;241;43m*[39;49m[38;5;241;43m*[39;49m[43mkw

In [0]:
df.show()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+



In [0]:
df.filter(df.firstname == "Maria").show(truncate=False)

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|dob       |gender|salary|
+---------+----------+--------+----------+------+------+
|Maria    |Anne      |Jones   |1967-12-01|F     |4000  |
+---------+----------+--------+----------+------+------+



In [0]:
li=["James","Robert","Maria"]
df.filter(df.firstname.isin(li)).show()


+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
+---------+----------+--------+----------+------+------+



In [0]:

data=[("James",23),("Ann",40)]
df=spark.createDataFrame(data).toDF("name.fname","gender")
df.printSchema()


root
 |-- name.fname: string (nullable = true)
 |-- gender: long (nullable = true)



In [0]:
df.show()

+----------+------+
|name.fname|gender|
+----------+------+
|     James|    23|
|       Ann|    40|
+----------+------+



In [0]:
# Using DataFrame object (df)
df.select(df.gender).show()
df.select(df["gender"]).show()

+------+
|gender|
+------+
|    23|
|    40|
+------+

+------+
|gender|
+------+
|    23|
|    40|
+------+



In [0]:
#Using SQL col() function
from pyspark.sql.functions import col
df.select(col("gender")).show()


+------+
|gender|
+------+
|    23|
|    40|
+------+



In [0]:
df.select([col for col in df.columns]).show()


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-3408317380716865>:1[0m
[0;32m----> 1[0m [43mdf[49m[38;5;241;43m.[39;49m[43mselect[49m[43m([49m[43m[[49m[43mcol[49m[43m [49m[38;5;28;43;01mfor[39;49;00m[43m [49m[43mcol[49m[43m [49m[38;5;129;43;01min[39;49;00m[43m [49m[43mdf[49m[38;5;241;43m.[39;49m[43mcolumns[49m[43m][49m[43m)[49m[38;5;241m.[39mshow()

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=[39m [43mfunc[49m[43m([49m[38;5;241;43m*[39;49m[43margs[49m[43m,[49m[43m [49m[38;5;241;43m*[39;49m[38;5;241;43m*[39;49m[43mkwargs[

In [0]:
df.columns

Out[38]: ['name.fname', 'gender']

In [0]:
df.select("*").show()

+----------+------+
|name.fname|gender|
+----------+------+
|     James|    23|
|       Ann|    40|
+----------+------+



In [0]:
df.printSchema()

root
 |-- name.fname: string (nullable = true)
 |-- gender: long (nullable = true)



In [0]:
df.withColumn("gender",col("gender").cast("Integer")).printSchema()

root
 |-- name.fname: string (nullable = true)
 |-- gender: integer (nullable = true)



In [0]:
from pyspark.sql.functions import lit
df.withColumn("Country", lit("USA")).show()

+----------+------+-------+
|name.fname|gender|Country|
+----------+------+-------+
|     James|    23|    USA|
|       Ann|    40|    USA|
+----------+------+-------+



In [0]:
df.show()

+----------+------+
|name.fname|gender|
+----------+------+
|     James|    23|
|       Ann|    40|
+----------+------+



In [0]:
df.withColumnRenamed('gender','age')

Out[46]: DataFrame[name.fname: string, age: bigint]

In [0]:
df.show()

+----------+------+
|name.fname|gender|
+----------+------+
|     James|    23|
|       Ann|    40|
+----------+------+



In [0]:
# Using SQL Expression
from pyspark.sql.functions import col
df = df.withColumnRenamed('name.fname','name')
df.show()

+-----+------+
| name|gender|
+-----+------+
|James|    23|
|  Ann|    40|
+-----+------+



In [0]:
df.select('name').show()

+-----+
| name|
+-----+
|James|
|  Ann|
+-----+



In [0]:
data = [("James", "Sales", 3000), \
    ("Michael", "Sales", 4600), \
    ("Robert", "Sales", 4100), \
    ("Maria", "Finance", 3000), \
    ("James", "Sales", 3000), \
    ("Scott", "Finance", 3300), \
    ("Jen", "Finance", 3900), \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", "Marketing", 2000), \
    ("Saif", "Sales", 4100) \
  ]

# Create DataFrame
columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+



In [0]:
distinctDF = df.distinct()
print("Distinct count: "+str(distinctDF.count()))
distinctDF.show(truncate=False)

Distinct count: 9
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+



In [0]:
dropDisDF = df.dropDuplicates(["department","salary"])
print("Distinct count of department & salary : "+str(dropDisDF.count()))
dropDisDF.show(truncate=False)

Distinct count of department & salary : 8
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|Maria        |Finance   |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Kumar        |Marketing |2000  |
|Jeff         |Marketing |3000  |
|James        |Sales     |3000  |
|Robert       |Sales     |4100  |
|Michael      |Sales     |4600  |
+-------------+----------+------+

