In [1]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext

## fillna() & fill() – Replace NULL/None Values

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("Foundation4").getOrCreate()

In [None]:
df = spark.read.options(header='true', inferSchema='true').csv("./small_zipcode.csv")
df.printSchema()
df.show(truncate=False)

In [10]:
#Replace 0 for null for all integer columns
df.na.fill(value=0).show()

#Replace Replace 0 for null on only population column 
df.na.fill(value=0,subset=["population"]).show()

+---+-------+--------+-------------------+-----+----------+
| id|zipcode|    type|               city|state|population|
+---+-------+--------+-------------------+-----+----------+
|  1|    704|STANDARD|               null|   PR|     30100|
|  2|    704|    null|PASEO COSTA DEL SUR|   PR|         0|
|  3|    709|    null|       BDA SAN LUIS|   PR|      3700|
|  4|  76166|  UNIQUE|  CINGULAR WIRELESS|   TX|     84000|
|  5|  76177|STANDARD|               null|   TX|         0|
+---+-------+--------+-------------------+-----+----------+

+---+-------+--------+-------------------+-----+----------+
| id|zipcode|    type|               city|state|population|
+---+-------+--------+-------------------+-----+----------+
|  1|    704|STANDARD|               null|   PR|     30100|
|  2|    704|    null|PASEO COSTA DEL SUR|   PR|         0|
|  3|    709|    null|       BDA SAN LUIS|   PR|      3700|
|  4|  76166|  UNIQUE|  CINGULAR WIRELESS|   TX|     84000|
|  5|  76177|STANDARD|               nu

In [11]:
# Replace Null/None Value with Empty String
df.na.fill("").show(truncate=False)

+---+-------+--------+-------------------+-----+----------+
|id |zipcode|type    |city               |state|population|
+---+-------+--------+-------------------+-----+----------+
|1  |704    |STANDARD|                   |PR   |30100     |
|2  |704    |        |PASEO COSTA DEL SUR|PR   |null      |
|3  |709    |        |BDA SAN LUIS       |PR   |3700      |
|4  |76166  |UNIQUE  |CINGULAR WIRELESS  |TX   |84000     |
|5  |76177  |STANDARD|                   |TX   |null      |
+---+-------+--------+-------------------+-----+----------+



In [12]:
df.na.fill("unknown",["city"]).na.fill("",["type"]).show()

+---+-------+--------+-------------------+-----+----------+
| id|zipcode|    type|               city|state|population|
+---+-------+--------+-------------------+-----+----------+
|  1|    704|STANDARD|            unknown|   PR|     30100|
|  2|    704|        |PASEO COSTA DEL SUR|   PR|      null|
|  3|    709|        |       BDA SAN LUIS|   PR|      3700|
|  4|  76166|  UNIQUE|  CINGULAR WIRELESS|   TX|     84000|
|  5|  76177|STANDARD|            unknown|   TX|      null|
+---+-------+--------+-------------------+-----+----------+



In [13]:
df.na.fill({"city": "unknown", "type": ""}).show()

+---+-------+--------+-------------------+-----+----------+
| id|zipcode|    type|               city|state|population|
+---+-------+--------+-------------------+-----+----------+
|  1|    704|STANDARD|            unknown|   PR|     30100|
|  2|    704|        |PASEO COSTA DEL SUR|   PR|      null|
|  3|    709|        |       BDA SAN LUIS|   PR|      3700|
|  4|  76166|  UNIQUE|  CINGULAR WIRELESS|   TX|     84000|
|  5|  76177|STANDARD|            unknown|   TX|      null|
+---+-------+--------+-------------------+-----+----------+



## Pivot and Unpivot DataFrame


In [3]:
data = [("Banana",1000,"USA"), ("Carrots",1500,"USA"), ("Beans",1600,"USA"), \
      ("Orange",2000,"USA"),("Orange",2000,"USA"),("Banana",400,"China"), \
      ("Carrots",1200,"China"),("Beans",1500,"China"),("Orange",4000,"China"), \
      ("Banana",2000,"Canada"),("Carrots",2000,"Canada"),("Beans",2000,"Mexico")]

columns= ["Product","Amount","Country"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)

root
 |-- Product: string (nullable = true)
 |-- Amount: long (nullable = true)
 |-- Country: string (nullable = true)

+-------+------+-------+
|Product|Amount|Country|
+-------+------+-------+
|Banana |1000  |USA    |
|Carrots|1500  |USA    |
|Beans  |1600  |USA    |
|Orange |2000  |USA    |
|Orange |2000  |USA    |
|Banana |400   |China  |
|Carrots|1200  |China  |
|Beans  |1500  |China  |
|Orange |4000  |China  |
|Banana |2000  |Canada |
|Carrots|2000  |Canada |
|Beans  |2000  |Mexico |
+-------+------+-------+



In [5]:
pivotDF = df.groupBy("Product").pivot("Country").sum("Amount")
pivotDF.printSchema()
pivotDF.show(truncate=False)

root
 |-- Product: string (nullable = true)
 |-- Canada: long (nullable = true)
 |-- China: long (nullable = true)
 |-- Mexico: long (nullable = true)
 |-- USA: long (nullable = true)

+-------+------+-----+------+----+
|Product|Canada|China|Mexico|USA |
+-------+------+-----+------+----+
|Orange |null  |4000 |null  |4000|
|Beans  |null  |1500 |2000  |1600|
|Banana |2000  |400  |null  |1000|
|Carrots|2000  |1200 |null  |1500|
+-------+------+-----+------+----+



In [7]:
# Unpivot PySpark DataFrame
from pyspark.sql.functions import expr
unpivotExpr = "stack(3, 'Canada', Canada, 'China', China, 'Mexico', Mexico) as (Country,Total)"
unPivotDF = pivotDF.select("Product", expr(unpivotExpr)) \
    .where("Total is not null")
unPivotDF.show(truncate=False)

+-------+-------+-----+
|Product|Country|Total|
+-------+-------+-----+
|Orange |China  |4000 |
|Beans  |China  |1500 |
|Beans  |Mexico |2000 |
|Banana |Canada |2000 |
|Banana |China  |400  |
|Carrots|Canada |2000 |
|Carrots|China  |1200 |
+-------+-------+-----+



In [13]:
#partitionBy()
df1 = spark.read.option("header",True) \
        .csv("./small_zipcode.csv")
df1.printSchema()

root
 |-- id: string (nullable = true)
 |-- zipcode: string (nullable = true)
 |-- type: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- population: string (nullable = true)



In [17]:
#partitionBy()
df1.write.option("header",True) \
        .partitionBy("state") \
        .mode("overwrite") \
        .csv("./tmp/zipcodes-state")

In [18]:
# Read a Specific Partition
dfSinglePart=spark.read.option("header",True) \
            .csv("./tmp/zipcodes-state/state=PR")
dfSinglePart.printSchema()
dfSinglePart.show()

root
 |-- id: string (nullable = true)
 |-- zipcode: string (nullable = true)
 |-- type: string (nullable = true)
 |-- city: string (nullable = true)
 |-- population: string (nullable = true)

+---+-------+--------+-------------------+----------+
| id|zipcode|    type|               city|population|
+---+-------+--------+-------------------+----------+
|  1|    704|STANDARD|               null|     30100|
|  2|    704|    null|PASEO COSTA DEL SUR|      null|
|  3|    709|    null|       BDA SAN LUIS|      3700|
+---+-------+--------+-------------------+----------+



## PySpark SQL Date and Timestamp Functions

In [3]:
from pyspark.sql.functions import *
data = [["1","2020-02-01"],["2","2020-03-01"],["3","2020-03-01"]]
df = spark.createDataFrame(data,["id","input"])
df.show()

+---+----------+
| id|     input|
+---+----------+
|  1|2020-02-01|
|  2|2020-03-01|
|  3|2020-03-01|
+---+----------+



In [5]:
# current_date()
df.select(current_date().alias("current_date")).show(1)

+------------+
|current_date|
+------------+
|  2021-05-10|
+------------+
only showing top 1 row



In [18]:
# date_format() 
# to parses the date and converts from yyyy-mm-dd to MM-dd-yyyy format.
# date_format(dateExpr,format)
df.select(col("input"), date_format(col("input"),"MM-dd-yyyy").alias("date_format")).show()

+----------+-----------+
|     input|date_format|
+----------+-----------+
|2020-02-01| 02-01-2020|
|2020-03-01| 03-01-2020|
|2020-03-01| 03-01-2020|
+----------+-----------+



In [20]:
# to_date()
# converts string in date format yyyy-MM-dd to a DateType yyyy-MM-dd using to_date()
# to_date(column, fomat)
df.select(col("input"),to_date(col("input"), "yyyy-MM-dd").alias("to_date")).show()

+----------+----------+
|     input|   to_date|
+----------+----------+
|2020-02-01|2020-02-01|
|2020-03-01|2020-03-01|
|2020-03-01|2020-03-01|
+----------+----------+



In [24]:
# datediff()
# returns the difference between two dates
# datediff(end, start)
df.select(col("input"),datediff(current_date(),col("input")).alias("datediff")).show()

+----------+--------+
|     input|datediff|
+----------+--------+
|2020-02-01|     464|
|2020-03-01|     435|
|2020-03-01|     435|
+----------+--------+



In [25]:
# months_between()
# returns the months between two dates
# months_between(end, start, roundOff)
df.select(col("input"),months_between(current_date(),col("input")).alias("months_between")).show()

+----------+--------------+
|     input|months_between|
+----------+--------------+
|2020-02-01|   15.29032258|
|2020-03-01|   14.29032258|
|2020-03-01|   14.29032258|
+----------+--------------+



In [28]:
# trunc()
# truncates the date at a specified unit
# trunc(column, format)
df.select(col("input"), 
    trunc(col("input"),"Month").alias("Month_Trunc"), 
    trunc(col("input"),"Year").alias("Month_Year"), 
    trunc(col("input"),"Month").alias("Month_Trunc")
   ).show()

+----------+-----------+----------+-----------+
|     input|Month_Trunc|Month_Year|Month_Trunc|
+----------+-----------+----------+-----------+
|2020-02-01| 2020-02-01|2020-01-01| 2020-02-01|
|2020-03-01| 2020-03-01|2020-01-01| 2020-03-01|
|2020-03-01| 2020-03-01|2020-01-01| 2020-03-01|
+----------+-----------+----------+-----------+



In [29]:
# add_months() , date_add(), date_sub()
df.select(col("input"), 
    add_months(col("input"),3).alias("add_months"), 
    add_months(col("input"),-3).alias("sub_months"), 
    date_add(col("input"),4).alias("date_add"), 
    date_sub(col("input"),4).alias("date_sub") 
  ).show()

+----------+----------+----------+----------+----------+
|     input|add_months|sub_months|  date_add|  date_sub|
+----------+----------+----------+----------+----------+
|2020-02-01|2020-05-01|2019-11-01|2020-02-05|2020-01-28|
|2020-03-01|2020-06-01|2019-12-01|2020-03-05|2020-02-26|
|2020-03-01|2020-06-01|2019-12-01|2020-03-05|2020-02-26|
+----------+----------+----------+----------+----------+



In [31]:
# year(), month(), month(),next_day(), weekofyear()
df.select(col("input"), 
     year(col("input")).alias("year"), 
     month(col("input")).alias("month"), 
     next_day(col("input"),"Friday").alias("next_day"), 
     weekofyear(col("input")).alias("weekofyear") 
  ).show()

+----------+----+-----+----------+----------+
|     input|year|month|  next_day|weekofyear|
+----------+----+-----+----------+----------+
|2020-02-01|2020|    2|2020-02-07|         5|
|2020-03-01|2020|    3|2020-03-06|         9|
|2020-03-01|2020|    3|2020-03-06|         9|
+----------+----+-----+----------+----------+



In [32]:
# dayofweek(), dayofmonth(), dayofyear()
df.select(col("input"),  
     dayofweek(col("input")).alias("dayofweek"), 
     dayofmonth(col("input")).alias("dayofmonth"), 
     dayofyear(col("input")).alias("dayofyear"), 
  ).show()

+----------+---------+----------+---------+
|     input|dayofweek|dayofmonth|dayofyear|
+----------+---------+----------+---------+
|2020-02-01|        7|         1|       32|
|2020-03-01|        1|         1|       61|
|2020-03-01|        1|         1|       61|
+----------+---------+----------+---------+



In [33]:
# create data frame 2
data=[["1","02-01-2020 11 01 19 06"],
      ["2","03-01-2019 12 01 19 406"],
      ["3","03-01-2021 12 01 19 406"]]
df2=spark.createDataFrame(data,["id","input"])
df2.show(truncate=False)

+---+-----------------------+
|id |input                  |
+---+-----------------------+
|1  |02-01-2020 11 01 19 06 |
|2  |03-01-2019 12 01 19 406|
|3  |03-01-2021 12 01 19 406|
+---+-----------------------+



In [34]:
df2.select(current_timestamp().alias("current_timestamp")
  ).show(1,truncate=False)

+-----------------------+
|current_timestamp      |
+-----------------------+
|2021-05-10 16:37:42.477|
+-----------------------+
only showing top 1 row



In [46]:
# to_timestamp()
# Converts string timestamp to Timestamp type format.
# The default format of the Spark Timestamp is yyyy-MM-dd HH:mm:ss.SSSS
df3 = df2.select(col("input"), 
    to_timestamp(col("input"), "MM-dd-yyyy HH mm ss SSS").alias("to_timestamp") 
  )
df3.show(truncate=False)

+-----------------------+-----------------------+
|input                  |to_timestamp           |
+-----------------------+-----------------------+
|02-01-2020 11 01 19 06 |2020-02-01 11:01:19.06 |
|03-01-2019 12 01 19 406|2019-03-01 12:01:19.406|
|03-01-2021 12 01 19 406|2021-03-01 12:01:19.406|
+-----------------------+-----------------------+



In [47]:
# hour(), Minute() and second()
df3.select(col("to_timestamp"), 
    hour(col("to_timestamp")).alias("hour"), 
    minute(col("to_timestamp")).alias("minute"),
    second(col("to_timestamp")).alias("second") 
  ).show(truncate=False)

+-----------------------+----+------+------+
|to_timestamp           |hour|minute|second|
+-----------------------+----+------+------+
|2020-02-01 11:01:19.06 |11  |1     |19    |
|2019-03-01 12:01:19.406|12  |1     |19    |
|2021-03-01 12:01:19.406|12  |1     |19    |
+-----------------------+----+------+------+



## JSON Functions

In [48]:
jsonString="""{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}"""
df4=spark.createDataFrame([(1, jsonString)],["id","value"])
df4.show(truncate=False)

+---+--------------------------------------------------------------------------+
|id |value                                                                     |
+---+--------------------------------------------------------------------------+
|1  |{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}|
+---+--------------------------------------------------------------------------+



In [50]:
# from_json()
#  to convert JSON string into Struct type or Map type
from pyspark.sql.types import MapType,StringType
from pyspark.sql.functions import from_json
df5=df4.withColumn("value",from_json(df4.value,MapType(StringType(),StringType())))
df5.printSchema()
df5.show(truncate=False)

root
 |-- id: long (nullable = true)
 |-- value: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+---+---------------------------------------------------------------------------+
|id |value                                                                      |
+---+---------------------------------------------------------------------------+
|1  |{Zipcode -> 704, ZipCodeType -> STANDARD, City -> PARC PARQUE, State -> PR}|
+---+---------------------------------------------------------------------------+



In [51]:
# to_json()
# to convert DataFrame columns MapType or Struct type to JSON string.
from pyspark.sql.functions import to_json,col
df5.withColumn("value",to_json(col("value"))) \
   .show(truncate=False)

+---+----------------------------------------------------------------------------+
|id |value                                                                       |
+---+----------------------------------------------------------------------------+
|1  |{"Zipcode":"704","ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}|
+---+----------------------------------------------------------------------------+



In [52]:
# json_tuple()
# used the query or extract the elements from JSON column and create the result as a new columns.
from pyspark.sql.functions import json_tuple
df4.select(col("id"),json_tuple(col("value"),"Zipcode","ZipCodeType","City")) \
    .toDF("id","Zipcode","ZipCodeType","City") \
    .show(truncate=False)

+---+-------+-----------+-----------+
|id |Zipcode|ZipCodeType|City       |
+---+-------+-----------+-----------+
|1  |704    |STANDARD   |PARC PARQUE|
+---+-------+-----------+-----------+



In [53]:
# get_json_object()
# to extract the JSON string based on path from the JSON column.
from pyspark.sql.functions import get_json_object
df4.select(col("id"),get_json_object(col("value"),"$.ZipCodeType").alias("ZipCodeType")) \
    .show(truncate=False)

+---+-----------+
|id |ZipCodeType|
+---+-----------+
|1  |STANDARD   |
+---+-----------+



In [54]:
# schema_of_json()
# Use schema_of_json() to create schema string from JSON string column.
from pyspark.sql.functions import schema_of_json,lit
schemaStr=spark.range(1) \
    .select(schema_of_json(lit("""{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}"""))) \
    .collect()[0][0]
print(schemaStr)

STRUCT<`City`: STRING, `State`: STRING, `ZipCodeType`: STRING, `Zipcode`: BIGINT>
