In [1]:
import pyspark
from pyspark.sql import SparkSession,types

In [2]:
spark = SparkSession.builder.appName('spark-api').enableHiveSupport().getOrCreate()

your 131072x1 screen size is bogus. expect trouble
23/04/20 21:06:55 WARN Utils: Your hostname, MSI resolves to a loopback address: 127.0.1.1; using 172.21.94.33 instead (on interface eth0)
23/04/20 21:06:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/20 21:06:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### `Schema`

In [3]:
# defining schema using DDL
schema = "`State` string, `Color` string, `Count` int"


# alternative way to define schema - programatically
# schema = types.StructType([
#             types.StructField("State", types.StringType(),False),
#             types.StructField("Color", types.StringType(),False),
#             types.StructField("Count", types.IntegerType(),False)
# ])

df = spark.read.csv('data/mnm.csv',schema=schema,header=True)

In [15]:
df.printSchema()

root
 |-- State: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Count: integer (nullable = true)



In [16]:
df.show()

+-----+------+-----+
|State| Color|Count|
+-----+------+-----+
|   TX|   Red|   20|
|   NV|  Blue|   66|
|   CO|  Blue|   79|
|   OR|  Blue|   71|
|   WA|Yellow|   93|
|   WY|  Blue|   16|
|   CA|Yellow|   53|
|   WA| Green|   60|
|   OR| Green|   71|
|   TX| Green|   68|
|   NV| Green|   59|
|   AZ| Brown|   95|
|   WA|Yellow|   20|
|   AZ|  Blue|   75|
|   OR| Brown|   72|
|   NV|   Red|   98|
|   WY|Orange|   45|
|   CO|  Blue|   52|
|   TX| Brown|   94|
|   CO|   Red|   82|
+-----+------+-----+
only showing top 20 rows



In [17]:
df.schema

StructType([StructField('State', StringType(), True), StructField('Color', StringType(), True), StructField('Count', IntegerType(), True)])

### `Columns`

In [45]:
from pyspark.sql import functions as F

In [32]:
# using expr func
df.select("State","Count",F.expr("Count*5").alias('Count_New')).show(5)

+-----+-----+---------+
|State|Count|Count_New|
+-----+-----+---------+
|   TX|   20|      100|
|   NV|   66|      330|
|   CO|   79|      395|
|   OR|   71|      355|
|   WA|   93|      465|
+-----+-----+---------+
only showing top 5 rows



In [31]:
# using col functions
df.select("State","Count",F.col("Count")*5).show(5)

+-----+-----+-----------+
|State|Count|(Count * 5)|
+-----+-----+-----------+
|   TX|   20|        100|
|   NV|   66|        330|
|   CO|   79|        395|
|   OR|   71|        355|
|   WA|   93|        465|
+-----+-----+-----------+
only showing top 5 rows



In [34]:
df.sort(F.desc("Count")).show(5)

+-----+------+-----+
|State| Color|Count|
+-----+------+-----+
|   UT|   Red|  100|
|   NM|Orange|  100|
|   NV|   Red|  100|
|   TX|   Red|  100|
|   UT|Yellow|  100|
+-----+------+-----+
only showing top 5 rows



In [39]:
df.sort(F.col("Count").desc()).show(5)

+-----+------+-----+
|State| Color|Count|
+-----+------+-----+
|   UT|   Red|  100|
|   NM|Orange|  100|
|   NV|   Red|  100|
|   TX|   Red|  100|
|   UT|Yellow|  100|
+-----+------+-----+
only showing top 5 rows



### `Row`

In [40]:
from pyspark.sql import Row

In [44]:
new_row = [Row('BH','Blue',23)]

In [45]:
new_row

[<Row('BH', 'Blue', 23)>]

In [46]:
new_df = spark.createDataFrame(new_row,schema=schema)

In [47]:
new_df.show()

+-----+-----+-----+
|State|Color|Count|
+-----+-----+-----+
|   BH| Blue|   23|
+-----+-----+-----+



In [48]:
df_mod = df.union(new_df)

In [52]:
df_mod.tail(5)

[Row(State='UT', Color='Orange', Count=87),
 Row(State='WA', Color='Blue', Count=31),
 Row(State='OR', Color='Brown', Count=78),
 Row(State='NV', Color='Orange', Count=73),
 Row(State='BH', Color='Blue', Count=23)]

### `Read,Write`

In [55]:
df_fire = spark.read.csv('/home/snehil/snehi/Desktop/Data/fire.csv',inferSchema=True,header=True)

                                                                                

In [56]:
df_fire.show(5)

+---------------+---------------+--------+--------------------+-------------------+-----------+-------------------+-------------------+-------------------+----+-------+---------+------------+----+-----------------+---------------------+---------+-------------+-----------+---------------+-------------------+-----------------------+-----------------------+---------------+-------------+-------------------+-----------------+----------------+--------------------+----------+--------------------+----------------------+------------------+--------------------------+--------------------+-------------------+--------------+-----------------------+-------------------------+-----------+------------------+--------------------------------------+--------------+----------------+--------------------+-----------+--------------+------------------------------------+----------------------------------------+----------------------------------+------------------------------------+-----------------+-------------

### `Projections and Filters`

In [77]:
df_fire.filter(F.col("neighborhood_district")=='Potrero Hill').show(5,truncate=False)

+---------------+---------------+--------+-----------------+-------------------+-----------+-------------------+-------------------+-------------------+----+-------+---------+------------+----+-----------------+---------------------+---------+-------------+-----------+---------------+-------------------+-----------------------+-----------------------+---------------+-------------+-------------------+-----------------+----------------+---------------------------------------------+----------+--------------------+----------------------+------------------+--------------------------+---------------------------------------+------------------------------------------+------------------------------------------+-----------------------+-------------------------+-----------------+------------------------------------------+--------------------------------------+--------------+----------------+--------------------+-----------+--------------+------------------------------------+----------------------

In [79]:
# different count of neighborhood_district

df_fire.where(F.col("neighborhood_district").isNotNull()).agg(F.countDistinct('neighborhood_district')).show()

+----------------------------+
|count(neighborhood_district)|
+----------------------------+
|                          41|
+----------------------------+



In [84]:
# listing them

df_fire.select('neighborhood_district').where(F.col("neighborhood_district").isNotNull()).distinct().show(5)

+---------------------+
|neighborhood_district|
+---------------------+
|         Inner Sunset|
|       Haight Ashbury|
|         Lincoln Park|
|            Japantown|
|          North Beach|
+---------------------+
only showing top 5 rows



In [106]:
df_fire.select(df_fire.columns[0:4]).show(truncate=False)

+---------------+---------------+--------+----------------------------+
|Incident Number|Exposure Number|ID      |Address                     |
+---------------+---------------+--------+----------------------------+
|8028304        |0              |80283040|150 Elsie St.               |
|8028303        |0              |80283030|85 Turner Tr.               |
|8028309        |0              |80283090|175 6th St.                 |
|8028314        |0              |80283140|633 Hayes St.               |
|8028319        |0              |80283190|27th Av. / Cabrillo St.     |
|8028337        |0              |80283370|165 Belgrave Av.            |
|8028329        |0              |80283290|Grant Av. / Post St.        |
|8028350        |0              |80283500|Cortland Av. / Andover St.  |
|8028355        |0              |80283550|2nd St. / Brannan St.       |
|8028352        |0              |80283520|300 Ortega St.              |
|8028359        |0              |80283590|241 6th St.           

In [107]:
df_fire.explain(True)

== Parsed Logical Plan ==
Relation [Incident Number#839,Exposure Number#840,ID#841,Address#842,Incident Date#843,Call Number#844,Alarm DtTm#845,Arrival DtTm#846,Close DtTm#847,City#848,zipcode#849,Battalion#850,Station Area#851,Box#852,Suppression Units#853,Suppression Personnel#854,EMS Units#855,EMS Personnel#856,Other Units#857,Other Personnel#858,First Unit On Scene#859,Estimated Property Loss#860,Estimated Contents Loss#861,Fire Fatalities#862,... 40 more fields] csv

== Analyzed Logical Plan ==
Incident Number: int, Exposure Number: int, ID: int, Address: string, Incident Date: timestamp, Call Number: int, Alarm DtTm: timestamp, Arrival DtTm: timestamp, Close DtTm: timestamp, City: string, zipcode: string, Battalion: string, Station Area: string, Box: string, Suppression Units: int, Suppression Personnel: int, EMS Units: int, EMS Personnel: int, Other Units: int, Other Personnel: int, First Unit On Scene: string, Estimated Property Loss: int, Estimated Contents Loss: double, Fire 

### `SQL Tables`

#### Managed Table or Internal

In [22]:
spark.sql('create database learn_spark')

23/04/20 20:37:24 WARN ObjectStore: Failed to get database learn_spark, returning NoSuchObjectException
23/04/20 20:37:24 WARN ObjectStore: Failed to get database learn_spark, returning NoSuchObjectException
23/04/20 20:37:24 WARN ObjectStore: Failed to get database learn_spark, returning NoSuchObjectException


DataFrame[]

In [19]:
spark.sql('use learn_spark')

DataFrame[]

##### using saveAsTable

In [40]:
schema="`State` string, `Color` string, `Count` int"

In [41]:
df.write.saveAsTable("mnm_1",schema=schema)

##### using spark sql

In [63]:
# creating a managed table
spark.sql("create table mnm_2 (`State` string, `Color` string, `Count` int)")

23/04/20 21:03:01 WARN ResolveSessionCatalog: A Hive serde table will be created as there is no table provider specified. You can set spark.sql.legacy.createHiveTableByDefault to false so that native data source table will be created instead.
23/04/20 21:03:01 WARN HiveMetaStore: Location: file:/home/snehil/Desktop/data-engineering-resources/4_spark/spark_book/spark-warehouse/learn_spark.db/mnm_2 specified for non-external table:mnm_2


DataFrame[]

In [64]:
df.createOrReplaceTempView('mnm')

In [65]:
spark.sql("insert into mnm_2  select * from mnm")

DataFrame[]

In [71]:
spark.sql('describe extended mnm_2').show(truncate=False)

+----------------------------+------------------------------------------------------------------------------------------------------------+-------+
|col_name                    |data_type                                                                                                   |comment|
+----------------------------+------------------------------------------------------------------------------------------------------------+-------+
|State                       |string                                                                                                      |null   |
|Color                       |string                                                                                                      |null   |
|Count                       |int                                                                                                         |null   |
|                            |                                                                                  

#### Unmanaged Table or External

##### using sql

In [55]:

spark.sql("""create table mnm_2_unmanaged (`State` string, `Color` string, `Count` int) using csv options (PATH '/home/snehil/Desktop/data-engineering-resources/4_spark/spark_book/data/mnm.csv')""")

23/04/20 20:51:37 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider csv. Persisting data source table `spark_catalog`.`learn_spark`.`mnm_2_unmanaged` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.


DataFrame[]

In [57]:
spark.sql('select * from mnm_2_unmanaged').show(5)

+-----+-----+-----+
|State|Color|Count|
+-----+-----+-----+
|State|Color| null|
|   TX|  Red|   20|
|   NV| Blue|   66|
|   CO| Blue|   79|
|   OR| Blue|   71|
+-----+-----+-----+
only showing top 5 rows



In [54]:
#spark.sql('drop table mnm_2_unmanaged')

23/04/20 20:51:27 WARN HadoopFSUtils: The directory file:/home/snehil/Desktop/data-engineering-resources/4_spark/spark_book/data/mnm1.csv was not found. Was it deleted very recently?


DataFrame[]

##### using saveAsTable

In [35]:
df.write.mode('overwrite').option("path", 'mnm_1_unmanaged').saveAsTable('mnm_1_unmanaged')

In [34]:
spark.sql('describe extended mnm_1_unmanaged').show(truncate=False)

+----------------------------+----------------------------------------------------------------------------------------------------------------------+-------+
|col_name                    |data_type                                                                                                             |comment|
+----------------------------+----------------------------------------------------------------------------------------------------------------------+-------+
|State                       |string                                                                                                                |null   |
|Color                       |string                                                                                                                |null   |
|Count                       |int                                                                                                                   |null   |
|                            |                      

##### Reading the saved table

In [31]:
df_read = spark.table('learn_spark.mnm_2')
df_read.show(5)

23/04/20 21:51:30 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.


+-----+------+-----+
|State| Color|Count|
+-----+------+-----+
|   TX|   Red|   20|
|   NV|  Blue|   66|
|   CO|  Blue|   79|
|   OR|  Blue|   71|
|   WA|Yellow|   93|
+-----+------+-----+
only showing top 5 rows



### `SQL Views`

In [79]:
df_agg = spark.sql('select `State`,`Color`,count(`Count`) from learn_spark.mnm_1 group by `State`,`Color`')

# global temporary view
df_agg.createOrReplaceGlobalTempView('global_temp_mnm_view')

In [80]:
# temporary view
df_agg.createOrReplaceTempView('global_temp_mnm_view')

Note : when accessing a global temporary view you must use the prefix global_temp.<view_name>

In [81]:
spark.sql('select * from global_temp.global_temp_mnm_view').show(5)

+-----+------+------------+
|State| Color|count(Count)|
+-----+------+------------+
|   WY| Green|        1695|
|   NV|   Red|        1610|
|   UT|  Blue|        1655|
|   WA|Orange|        1658|
|   NM| Green|        1682|
+-----+------+------------+
only showing top 5 rows



### `Viewing the Metadata - Spark Catalog`

In [86]:
spark.catalog.listDatabases()

[Database(name='default', catalog='spark_catalog', description='Default Hive database', locationUri='file:/home/snehil/Desktop/data-engineering-resources/4_spark/spark_book/spark-warehouse'),
 Database(name='learn_spark', catalog='spark_catalog', description='', locationUri='file:/home/snehil/Desktop/data-engineering-resources/4_spark/spark_book/spark-warehouse/learn_spark.db')]

In [87]:
spark.catalog.listTables()

[Table(name='mnm_1', catalog='spark_catalog', namespace=['learn_spark'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='mnm_1_unmanaged', catalog='spark_catalog', namespace=['learn_spark'], description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='mnm_2', catalog='spark_catalog', namespace=['learn_spark'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='mnm_2_unmanaged', catalog='spark_catalog', namespace=['learn_spark'], description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='global_temp_mnm_view', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [90]:
spark.catalog.listColumns("learn_spark.mnm_2")

[Column(name='State', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='Color', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='Count', description=None, dataType='int', nullable=True, isPartition=False, isBucket=False)]

In [91]:
spark.sql('describe extended global_temp.global_temp_mnm_view').show(truncate=False)

+------------+---------+-------+
|col_name    |data_type|comment|
+------------+---------+-------+
|State       |string   |null   |
|Color       |string   |null   |
|count(Count)|bigint   |null   |
+------------+---------+-------+



### `Caching`

-- In SQL

* CACHE [LAZY] TABLE <table-name>

* UNCACHE TABLE <table-name>

### `Reading an Image file in a dataframe`

In [113]:
from pyspark.ml import image
image_dir = 'data/train_images/'
image_df = spark.read.format("image").load(image_dir)

In [114]:
image_df.printSchema()

root
 |-- image: struct (nullable = true)
 |    |-- origin: string (nullable = true)
 |    |-- height: integer (nullable = true)
 |    |-- width: integer (nullable = true)
 |    |-- nChannels: integer (nullable = true)
 |    |-- mode: integer (nullable = true)
 |    |-- data: binary (nullable = true)



In [119]:
image_df.select('image.origin','image.height','image.width','image.nChannels','image.mode').show(truncate=False)

+--------------------------------------------------------------------------------------------------------+------+-----+---------+----+
|origin                                                                                                  |height|width|nChannels|mode|
+--------------------------------------------------------------------------------------------------------+------+-----+---------+----+
|file:///home/snehil/Desktop/data-engineering-resources/4_spark/spark_book/data/train_images/IMG_0074.PNG|1135  |1825 |4        |24  |
|file:///home/snehil/Desktop/data-engineering-resources/4_spark/spark_book/data/train_images/IMG_0073.PNG|1172  |2360 |4        |24  |
+--------------------------------------------------------------------------------------------------------+------+-----+---------+----+

