# chapter 5 

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("c5").getOrCreate()

In [2]:
!netstat -anp |grep 4040 

tcp6       0      0 :::4040                 :::*                    LISTEN      8998/java           


In [3]:
json_df = spark.read.json('/root/golive/Spark-The-Definitive-Guide/data/flight-data/json/2015-summary.json')

In [4]:
json_df.describe()

DataFrame[summary: string, DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: string]

In [5]:
json_df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



In [22]:
json_df.schema
json_df.limit(4).collect()

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
 Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15)]

In [7]:
# in general for ETL , 
# its recommended to predefine the schema and not entirely rely on Spark to infer the schema on read.

from pyspark.sql.types import StructType,StructField,StringType,LongType

In [8]:
mymanualschema = StructType([ StructField(" DEST_COUNTRY_NAME", StringType(), True)\
                             , StructField(" ORIGIN_COUNTRY_NAME", StringType(), True)\
                             , StructField(" count", LongType(), False, metadata ={" hello":" world"})
                            ])


In [26]:
json_df1 = spark.read.json('/root/golive/Spark-The-Definitive-Guide/data/flight-data/json/2015-summary.json',schema=mymanualschema ,multiLine=True)


json_df2 = spark.read.format("json")\
.schema(mymanualschema)\
.load('/root/golive/Spark-The-Definitive-Guide/data/flight-data/json/2015-summary.json')


In [27]:
json_df1.schema
json_df2.schema

StructType(List(StructField( DEST_COUNTRY_NAME,StringType,true),StructField( ORIGIN_COUNTRY_NAME,StringType,true),StructField( count,LongType,true)))

In [28]:
json_df1.limit(5).collect()
json_df2.limit(5).collect()

[Row( DEST_COUNTRY_NAME=None,  ORIGIN_COUNTRY_NAME=None,  count=None),
 Row( DEST_COUNTRY_NAME=None,  ORIGIN_COUNTRY_NAME=None,  count=None),
 Row( DEST_COUNTRY_NAME=None,  ORIGIN_COUNTRY_NAME=None,  count=None),
 Row( DEST_COUNTRY_NAME=None,  ORIGIN_COUNTRY_NAME=None,  count=None),
 Row( DEST_COUNTRY_NAME=None,  ORIGIN_COUNTRY_NAME=None,  count=None)]

In [11]:
#functions 
from pyspark.sql.functions import col 
x= col("mycolumnname")

In [12]:
type(x)  # A column outside DF ??

pyspark.sql.column.Column

In [13]:
json_df.select('DEST_COUNTRY_NAME').show()


+--------------------+
|   DEST_COUNTRY_NAME|
+--------------------+
|       United States|
|       United States|
|       United States|
|               Egypt|
|       United States|
|       United States|
|       United States|
|          Costa Rica|
|             Senegal|
|             Moldova|
|       United States|
|       United States|
|              Guyana|
|               Malta|
|            Anguilla|
|             Bolivia|
|       United States|
|             Algeria|
|Turks and Caicos ...|
|       United States|
+--------------------+
only showing top 20 rows



In [14]:
json_df1.columns

[' DEST_COUNTRY_NAME', ' ORIGIN_COUNTRY_NAME', ' count']

In [15]:
json_df.first()

Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15)

In [16]:
from pyspark.sql import Row
    

In [17]:
newrow = Row('country1','India',33)

In [18]:
type(newrow)

pyspark.sql.types.Row

In [19]:
json_df.createOrReplaceTempView("t1")

In [20]:
sql = "select * from t1"
spark.sql(sql).take(10)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
 Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Grenada', count=62),
 Row(DEST_COUNTRY_NAME='Costa Rica', ORIGIN_COUNTRY_NAME='United States', count=588),
 Row(DEST_COUNTRY_NAME='Senegal', ORIGIN_COUNTRY_NAME='United States', count=40),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

In [21]:
# create data frame  manually 
#steps:
# 1. Create schema struture 
# 2. Create a row using Row

#from pyspark.sql import Row
#from pyspark.sql.types import StructType,StructField,StringType,LongType


In [22]:
myschema = StructType([StructField("f1", StringType(), True)\
                      ,StructField("f2",StringType(), True)\
                      ,StructField("f3",LongType(),True)
                       ])





In [23]:
myrow = Row('Sajin','vk',99)

In [24]:
df = spark.createDataFrame([myrow],myschema)

#myDf = spark.createDataFrame([ myrow], myschema)



In [25]:
df.show()

+-----+---+---+
|   f1| f2| f3|
+-----+---+---+
|Sajin| vk| 99|
+-----+---+---+



In [26]:
from pyspark.sql.functions import col ,expr ,column

In [27]:
json_df.columns


['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']

In [28]:
json_df.select(expr('DEST_COUNTRY_NAME'),
              col ('DEST_COUNTRY_NAME'),
              column('DEST_COUNTRY_NAME')).show(5)

+-----------------+-----------------+-----------------+
|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|
+-----------------+-----------------+-----------------+
|    United States|    United States|    United States|
|    United States|    United States|    United States|
|    United States|    United States|    United States|
|            Egypt|            Egypt|            Egypt|
|    United States|    United States|    United States|
+-----------------+-----------------+-----------------+
only showing top 5 rows



In [29]:
json_df.select(expr('DEST_COUNTRY_NAME as dd'),
              'DEST_COUNTRY_NAME',
              ).show(5)

+-------------+-----------------+
|           dd|DEST_COUNTRY_NAME|
+-------------+-----------------+
|United States|    United States|
|United States|    United States|
|United States|    United States|
|        Egypt|            Egypt|
|United States|    United States|
+-------------+-----------------+
only showing top 5 rows



In [30]:
json_df.selectExpr("*", "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withincountry" ).show()

+--------------------+-------------------+-----+-------------+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withincountry|
+--------------------+-------------------+-----+-------------+
|       United States|            Romania|   15|        false|
|       United States|            Croatia|    1|        false|
|       United States|            Ireland|  344|        false|
|               Egypt|      United States|   15|        false|
|       United States|              India|   62|        false|
|       United States|          Singapore|    1|        false|
|       United States|            Grenada|   62|        false|
|          Costa Rica|      United States|  588|        false|
|             Senegal|      United States|   40|        false|
|             Moldova|      United States|    1|        false|
|       United States|       Sint Maarten|  325|        false|
|       United States|   Marshall Islands|   39|        false|
|              Guyana|      United States|   64|       

In [31]:
sql = "select * , (DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withincountry from t1  "
spark.sql(sql).show()

+--------------------+-------------------+-----+-------------+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withincountry|
+--------------------+-------------------+-----+-------------+
|       United States|            Romania|   15|        false|
|       United States|            Croatia|    1|        false|
|       United States|            Ireland|  344|        false|
|               Egypt|      United States|   15|        false|
|       United States|              India|   62|        false|
|       United States|          Singapore|    1|        false|
|       United States|            Grenada|   62|        false|
|          Costa Rica|      United States|  588|        false|
|             Senegal|      United States|   40|        false|
|             Moldova|      United States|    1|        false|
|       United States|       Sint Maarten|  325|        false|
|       United States|   Marshall Islands|   39|        false|
|              Guyana|      United States|   64|       

In [32]:
#from pyspark.sql.functions import avg , count
json_df.selectExpr("avg(count)"\
                   ,"count(distinct(DEST_COUNTRY_NAME))"\
                   ).show()



+-----------+---------------------------------+
| avg(count)|count(DISTINCT DEST_COUNTRY_NAME)|
+-----------+---------------------------------+
|1770.765625|                              132|
+-----------+---------------------------------+



In [33]:
sql = "select avg(count), count(distinct(DEST_COUNTRY_NAME)) from t1  "
#sql = "select count from t1"
spark.sql(sql).show()

+-----------+---------------------------------+
| avg(count)|count(DISTINCT DEST_COUNTRY_NAME)|
+-----------+---------------------------------+
|1770.765625|                              132|
+-----------+---------------------------------+



In [34]:
#literals 

sql = "select * , 1 as newcolumn from t1"
spark.sql(sql).show(2)

+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|newcolumn|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|        1|
|    United States|            Croatia|    1|        1|
+-----------------+-------------------+-----+---------+
only showing top 2 rows



In [35]:

from pyspark.sql.functions import lit 


In [36]:
json_df.selectExpr("*"\
                   , "1 as newcolumn").show(2)

+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|newcolumn|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|        1|
|    United States|            Croatia|    1|        1|
+-----------------+-------------------+-----+---------+
only showing top 2 rows



In [37]:
json_df.select(expr("*")\
                   , lit(1).alias("newcolumn")).show(2)

+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|newcolumn|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|        1|
|    United States|            Croatia|    1|        1|
+-----------------+-------------------+-----+---------+
only showing top 2 rows



In [38]:
# ADDING Columns :
# withColumn 

json_df.withColumn("newcolumn",lit("2")).show(2)


+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|newcolumn|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|        2|
|    United States|            Croatia|    1|        2|
+-----------------+-------------------+-----+---------+
only showing top 2 rows



In [39]:
json_df.withColumn("newcolumn",expr("DEST_COUNTRY_NAME == ORIGIN_COUNTRY_NAME ")).show(2)

+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|newcolumn|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|    false|
|    United States|            Croatia|    1|    false|
+-----------------+-------------------+-----+---------+
only showing top 2 rows



In [40]:
#Renaming columns 
json_df.withColumnRenamed("DEST_COUNTRY_NAME","DEST").show(2)

+-------------+-------------------+-----+
|         DEST|ORIGIN_COUNTRY_NAME|count|
+-------------+-------------------+-----+
|United States|            Romania|   15|
|United States|            Croatia|    1|
+-------------+-------------------+-----+
only showing top 2 rows



In [41]:
#removing COLUMS df.drop("col1","col2")
json_df.drop("count").show(3)



+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
|    United States|            Ireland|
+-----------------+-------------------+
only showing top 3 rows



In [42]:
json_df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



In [43]:
# cast is used for type conversion 

#CAST 

# Filtering ROWS 

In [44]:
#where and #filter 
json_df.where("count > 1000").show(5)


+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|           Mexico|      United States|  7140|
|    United States| Dominican Republic|  1420|
|    United States|      United States|370002|
|          Germany|      United States|  1468|
|           Canada|      United States|  8399|
+-----------------+-------------------+------+
only showing top 5 rows



In [45]:
sql = "select * from t1 where count > 1000"
spark.sql(sql).show(5)

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|           Mexico|      United States|  7140|
|    United States| Dominican Republic|  1420|
|    United States|      United States|370002|
|          Germany|      United States|  1468|
|           Canada|      United States|  8399|
+-----------------+-------------------+------+
only showing top 5 rows



In [46]:
json_df.where("count > 1000").where("DEST_COUNTRY_NAME != 'United States'").show(5)

#same as  SQL AND 
# select * from t1 where count > 1000 and Dest_country_Name != 'United States'

+------------------+-------------------+-----+
| DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+------------------+-------------------+-----+
|            Mexico|      United States| 7140|
|           Germany|      United States| 1468|
|            Canada|      United States| 8399|
|Dominican Republic|      United States| 1353|
|             Japan|      United States| 1548|
+------------------+-------------------+-----+
only showing top 5 rows



In [47]:
#Distinct DEST_COUNTRY_NAME and ORIGIN_COUNTRY_NAME 

sql = "select count(distinct(DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME)) as count_distinct from t1 "
spark.sql(sql).show()

+--------------+
|count_distinct|
+--------------+
|           256|
+--------------+



In [48]:
json_df.select("DEST_COUNTRY_NAME","ORIGIN_COUNTRY_NAME").distinct().count()

256

In [49]:
json_df.select("ORIGIN_COUNTRY_NAME").distinct().count()

125

In [50]:
json_df.count()

256

In [51]:
json_df
seed = 5 
withReplacement = False 
fraction = 0.5 
json_df.sample( withReplacement, fraction, seed).count()



138

In [52]:
newdf=json_df.randomSplit([0.25 ,0.75],seed=None)
newdf[0].count()


55

In [53]:
newdf[1].count()

201

In [55]:
# Sort data 
# Sort or Order BY 
from pyspark.sql.functions import asc, desc 

#Advanced Options 
#asc_nulls_first,
#desc_nulls_first, 
#asc_nulls_last, 
#desc_nulls_last



json_df.orderBy(expr("count desc")).show(5)

#SQL Equivalent 
# sql = "select * from t1 order by count desc "

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
+--------------------+-------------------+-----+
only showing top 5 rows



In [56]:
json_df.rdd.getNumPartitions()

1

In [57]:
# REpartition only when the number of partitions needs to be increased 
new_repartion_df = json_df.repartition(5)
new_repartion_df.rdd.getNumPartitions()

5

In [58]:
json_df.repartition(5,col("DEST_COUNTRY_NAME"))

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

In [59]:
spark.stop()