In [1]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840625 sha256=25a0983c3e88f08d01454996e9ac8e546f1eaa705294a86c7032cf9f690e98b0
  Stored in directory: /root/.cache/pip/wheels/1b/3a/92/28b93e2fbfdbb07509ca4d6f50c5e407f48dce4ddbda69a4ab
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.3
The following additional packages will be installed:
  libxtst6 openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra fonts-nanum fonts-ipafont-gothic
  fonts-ipafont-mincho fonts-wqy-microhei fonts-wqy-zenhei fonts-indic

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

In [3]:
# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
import pyspark
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

In [4]:
from google.colab import files
files.upload()

Saving people.json to people.json


{'people.json': b'{"name":"Michael"}\n{"name":"Andy", "age":30}\n{"name":"Justin", "age":19}\n{"name":"Mary", "age":29}\n{"name":"John"}\n{"name":"Juhi", "age":36}\n{"name":"Jasmeen", "age":13}\n{"name":"Seeta", "age":12}\n{"age":45}\n{"name":"Mohan", "age":34}\n{"name":"Kayen", "age":18}\n{"age":67}\n{"name":"Caley", "age":21}\n{"name":"Shyam", "age":19}\n'}

In [5]:
r=spark.read.json("people.json")
r.show()

+----+-------+
| age|   name|
+----+-------+
|NULL|Michael|
|  30|   Andy|
|  19| Justin|
|  29|   Mary|
|NULL|   John|
|  36|   Juhi|
|  13|Jasmeen|
|  12|  Seeta|
|  45|   NULL|
|  34|  Mohan|
|  18|  Kayen|
|  67|   NULL|
|  21|  Caley|
|  19|  Shyam|
+----+-------+



In [6]:
#1. Count number of records
r.count()

14

In [7]:
#create a new column with the simple copy of age column.
r.withColumn("age_copy",r.age).show()

+----+-------+--------+
| age|   name|age_copy|
+----+-------+--------+
|NULL|Michael|    NULL|
|  30|   Andy|      30|
|  19| Justin|      19|
|  29|   Mary|      29|
|NULL|   John|    NULL|
|  36|   Juhi|      36|
|  13|Jasmeen|      13|
|  12|  Seeta|      12|
|  45|   NULL|      45|
|  34|  Mohan|      34|
|  18|  Kayen|      18|
|  67|   NULL|      67|
|  21|  Caley|      21|
|  19|  Shyam|      19|
+----+-------+--------+



In [8]:
r.withColumn("new_age",r.age+1).show()

+----+-------+-------+
| age|   name|new_age|
+----+-------+-------+
|NULL|Michael|   NULL|
|  30|   Andy|     31|
|  19| Justin|     20|
|  29|   Mary|     30|
|NULL|   John|   NULL|
|  36|   Juhi|     37|
|  13|Jasmeen|     14|
|  12|  Seeta|     13|
|  45|   NULL|     46|
|  34|  Mohan|     35|
|  18|  Kayen|     19|
|  67|   NULL|     68|
|  21|  Caley|     22|
|  19|  Shyam|     20|
+----+-------+-------+



In [13]:
#4. Show the records where age column is missing
r.filter(r.age.isNull()).show()

+----+-------+
| age|   name|
+----+-------+
|NULL|Michael|
|NULL|   John|
+----+-------+



In [36]:
#5. Fetch the records having atleast one non-null values.
r.na.drop(thresh=1).show()

+----+-------+
| age|   name|
+----+-------+
|NULL|Michael|
|  30|   Andy|
|  19| Justin|
|  29|   Mary|
|NULL|   John|
|  36|   Juhi|
|  13|Jasmeen|
|  12|  Seeta|
|  45|   NULL|
|  34|  Mohan|
|  18|  Kayen|
|  67|   NULL|
|  21|  Caley|
|  19|  Shyam|
+----+-------+



In [29]:
#6. Fill the missing values of age column with mean.
from pyspark.sql.functions import mean
g=r.select(mean("age")).collect()[0][0]
r.na.fill(g,subset="age").show()

+---+-------+
|age|   name|
+---+-------+
| 28|Michael|
| 30|   Andy|
| 19| Justin|
| 29|   Mary|
| 28|   John|
| 36|   Juhi|
| 13|Jasmeen|
| 12|  Seeta|
| 45|   NULL|
| 34|  Mohan|
| 18|  Kayen|
| 67|   NULL|
| 21|  Caley|
| 19|  Shyam|
+---+-------+



In [31]:
#7. Write a sql query to find the people with age greater than 19.
r.createOrReplaceTempView("people")
spark.sql("select * from people where age>19").show()

+-----+
| name|
+-----+
| Andy|
| Mary|
| Juhi|
| NULL|
|Mohan|
| NULL|
|Caley|
+-----+

