In [1]:
from pyspark import SparkContext as sc

In [2]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf = conf)

In [3]:
# In Python
# Create an RDD of tuples (name, age)
dataRDD = sc.parallelize([("Brooke", 20), ("Denny", 31), ("Jules", 30),
                            ("TD", 35), ("Brooke", 25)])
# Use map and reduceByKey transformations with their lambda
# expressions to aggregate and then compute average
agesRDD = (dataRDD
            .map(lambda x: (x[0], (x[1], 1)))
            .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
            .map(lambda x: (x[0], x[1][0]/x[1][1])))
agesRDD.collect()

[('Brooke', 22.5), ('Denny', 31.0), ('Jules', 30.0), ('TD', 35.0)]

In [4]:
# In Python
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
# Create a DataFrame using SparkSession
spark = (SparkSession
    .builder
    .appName("AuthorsAges")
    .getOrCreate())
# sc = spark.sparkContext
# Create a DataFrame
data_df = spark.createDataFrame([("Brooke", 20), ("Denny", 31), ("Jules", 30),
     ("TD", 35), ("Brooke", 25)], ["name", "age"])
# Group the same names together, aggregate their ages, and compute an average
avg_df = data_df.groupBy("name").agg(avg("age"))
# Show the results of the final execution
avg_df.show()

+------+--------+
|  name|avg(age)|
+------+--------+
|Brooke|    22.5|
| Jules|    30.0|
|    TD|    35.0|
| Denny|    31.0|
+------+--------+



In [5]:
spark = [SparkSession
    .builder
    .appName("AuthorsAges")
    .getOrCreate()]
spark

[<pyspark.sql.session.SparkSession at 0x231c354ff70>]

In [6]:
type(spark)

list

In [7]:
spark = (SparkSession
    .builder
    .appName("AuthorsAges")
    .getOrCreate())
spark

In [8]:
type(spark)

pyspark.sql.session.SparkSession

In [9]:
spark = SparkSession\
    .builder\
    .appName("AuthorsAges")\
    .getOrCreate()
spark

In [10]:
type(spark)

pyspark.sql.session.SparkSession

In [11]:
a = 2
b = 2,
c = (2)

In [12]:
type(a), type(b), type(c)

(int, tuple, int)

In [13]:
spark = (SparkSession
    .builder
    .appName("AuthorsAges")
    .getOrCreate(),)
spark

(<pyspark.sql.session.SparkSession at 0x231c354ff70>,)

In [14]:
type(spark)

tuple

In [15]:
spark = (SparkSession
    .builder
    .appName("AuthorsAges")
    .getOrCreate())
spark

In [16]:
type(spark)

pyspark.sql.session.SparkSession

In [17]:
lst = 2,3,4,
5

5

In [18]:
lst

(2, 3, 4)

In [19]:
lst = (2,3,4,
5)

In [20]:
lst

(2, 3, 4, 5)

> Conclusion - While creating the spark object we are using parenthesis ("()") because we are initializing it in multiline. If we do not put comma after something even if it is in parenthesis ("()") python will not treat it as a tuple instead it will treat it like the type of object inside the parenthesis

### Two ways to define a schema
Spark allows you to define a schema in two ways. One is to define it programmati‐
cally, and the other is to employ a Data Definition Language (DDL) string, which is
much simpler and easier to read.

In [21]:
# Programmatically defining a schema

# In Python
from pyspark.sql.types import *
schema = StructType([StructField("author", StringType(), False),
    StructField("title", StringType(), False),
    StructField("pages", IntegerType(), False)])
type(schema)

pyspark.sql.types.StructType

In [22]:
# defining schema using DDL

# In Python
from pyspark.sql import SparkSession
# Define schema for our data using DDL
schema = "Id INT, First STRING, Last STRING, Url STRING,\
    Published STRING, Hits INT, Campaigns ARRAY<STRING>"
type(schema)

str

In [23]:
# Create our static data
data = [
    [1, "Jules", "Damji", "https://tinyurl.1", "1/4/2016", 4535, ["twitter",
        "LinkedIn"]],
    [2, "Brooke","Wenig", "https://tinyurl.2", "5/5/2018", 8908, ["twitter",
        "LinkedIn"]],
    [3, "Denny", "Lee", "https://tinyurl.3", "6/7/2019", 7659, ["web",
        "twitter", "FB", "LinkedIn"]],
    [4, "Tathagata", "Das", "https://tinyurl.4", "5/12/2018", 10568,
        ["twitter", "FB"]],
    [5, "Matei","Zaharia", "https://tinyurl.5", "5/14/2014", 40578, ["web",
        "twitter", "FB", "LinkedIn"]],
    [6, "Reynold", "Xin", "https://tinyurl.6", "3/2/2015", 25568,
        ["twitter", "LinkedIn"]]
    ]

In [24]:
# Main program
if __name__ == "__main__":
# Create a SparkSession
    spark = (SparkSession
            .builder
            .appName("Example-3_6")
            .getOrCreate())
    # Create a DataFrame using the schema defined above
    blogs_df = spark.createDataFrame(data, schema)
    # Show the DataFrame; it should reflect our table above
    blogs_df.show()
    # Print the schema used by Spark to process the DataFrame
    print(blogs_df.printSchema())

+---+---------+-------+-----------------+---------+-----+--------------------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|
+---+---------+-------+-----------------+---------+-----+--------------------+
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|
+---+---------+-------+-----------------+---------+-----+--------------------+

root
 |-- Id: integer (nullable = true)
 |-- First: string (nullable = true)
 |-- Last: string (nullable = true)
 |-- Url: string (nullable = true)
 |-- Published: string (nullable = true)
 |-- Hits: integer (

In [25]:
# Create a DataFrame with the explicit schema specified.

from pyspark.sql.types import *
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)])
spark.createDataFrame([('Alice', 1)], schema).show()

+-----+---+
| name|age|
+-----+---+
|Alice|  1|
+-----+---+



In [26]:
# Create a DataFrame with the schema in DDL formatted string.

spark.createDataFrame([('Alice', 1)], "name: string, age: int").show()

+-----+---+
| name|age|
+-----+---+
|Alice|  1|
+-----+---+



In [27]:
import pyspark.sql.types as T                   

# here is the traditional way to define a shema in PySpark
schema = T.StructType([
  T.StructField("col1", T.StringType(), True),
  T.StructField("col2", T.IntegerType(), True),
  T.StructField("col3", T.TimestampType(), True)
])
type(schema)

pyspark.sql.types.StructType

In [28]:
import pyspark.sql.types as T                   

# and here is the way using the helper function out of types
ddl_schema_string = "col1 string, col2 integer, col3 timestamp"
ddl_schema = T._parse_datatype_string(ddl_schema_string)
type(ddl_schema)

pyspark.sql.types.StructType

In [29]:
T._parse_datatype_string(ddl_schema_string)

StructType([StructField('col1', StringType(), True), StructField('col2', IntegerType(), True), StructField('col3', TimestampType(), True)])

In [30]:
#If you want to use this schema elsewhere in your code, simply execute# blogs_df.schema and it will return the schema definition:
blogs_df.schema

StructType([StructField('Id', IntegerType(), True), StructField('First', StringType(), True), StructField('Last', StringType(), True), StructField('Url', StringType(), True), StructField('Published', StringType(), True), StructField('Hits', IntegerType(), True), StructField('Campaigns', ArrayType(StringType(), True), True)])

In [31]:
from pyspark.sql.functions import *

# Show columns and expressions
blogs_df.select(expr("Hits") * 2).show(2)
blogs_df.select(col("Hits") * 2).show(2)
blogs_df.select(expr("Hits * 2")).show(2)
# show heavy hitters
blogs_df.withColumn("Big Hitters", (expr("Hits > 10000"))).show()

+----------+
|(Hits * 2)|
+----------+
|      9070|
|     17816|
+----------+
only showing top 2 rows

+----------+
|(Hits * 2)|
+----------+
|      9070|
|     17816|
+----------+
only showing top 2 rows

+----------+
|(Hits * 2)|
+----------+
|      9070|
|     17816|
+----------+
only showing top 2 rows

+---+---------+-------+-----------------+---------+-----+--------------------+-----------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|Big Hitters|
+---+---------+-------+-----------------+---------+-----+--------------------+-----------+
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|      false|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|      false|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|      false|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|       true|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|405

In [32]:
blogsDF = blogs_df

In [33]:
# // Use an expression to compute big hitters for blogs
# // This adds a new column, Big Hitters, based on the conditional expression
blogsDF.withColumn("Big Hitters", (expr("Hits > 10000"))).show()

+---+---------+-------+-----------------+---------+-----+--------------------+-----------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|Big Hitters|
+---+---------+-------+-----------------+---------+-----+--------------------+-----------+
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|      false|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|      false|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|      false|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|       true|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|       true|
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|       true|
+---+---------+-------+-----------------+---------+-----+--------------------+-----------+



In [34]:
# // Concatenate three columns, create a new column, and show the
# // newly created concatenated column
(blogsDF
.withColumn("AuthorsId", (concat(expr("First"), expr("Last"), expr("Id"))))
.select(col("AuthorsId"))
.show(4))

+-------------+
|    AuthorsId|
+-------------+
|  JulesDamji1|
| BrookeWenig2|
|    DennyLee3|
|TathagataDas4|
+-------------+
only showing top 4 rows



In [35]:
# // These statements return the same value, showing that
# // expr is the same as a col method call
blogsDF.select(expr("Hits")).show(2)
blogsDF.select(col("Hits")).show(2)
blogsDF.select("Hits").show(2)

+----+
|Hits|
+----+
|4535|
|8908|
+----+
only showing top 2 rows

+----+
|Hits|
+----+
|4535|
|8908|
+----+
only showing top 2 rows

+----+
|Hits|
+----+
|4535|
|8908|
+----+
only showing top 2 rows



In [36]:
# // Sort by column "Id" in descending order
blogsDF.sort(col("Id").desc()).show()
blogsDF.sort(expr("Id").desc()).show()

+---+---------+-------+-----------------+---------+-----+--------------------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|
+---+---------+-------+-----------------+---------+-----+--------------------+
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|
+---+---------+-------+-----------------+---------+-----+--------------------+

+---+---------+-------+-----------------+---------+-----+--------------------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|
+---+---------+-------+-----------------+---------+

### Rows

In [37]:
# In Python
from pyspark.sql import Row
blog_row = Row(6, "Reynold", "Xin", "https://tinyurl.6", 255568, "3/2/2015",
    ["twitter", "LinkedIn"])
# access using index for individual items
blog_row[1], blog_row

('Reynold',
 <Row(6, 'Reynold', 'Xin', 'https://tinyurl.6', 255568, '3/2/2015', ['twitter', 'LinkedIn'])>)

In [38]:
# In Python
rows = [Row("Matei Zaharia", "CA"), Row("Reynold Xin", "CA")]
authors_df = spark.createDataFrame(rows, ["Authors", "State"])
authors_df.show()

+-------------+-----+
|      Authors|State|
+-------------+-----+
|Matei Zaharia|   CA|
|  Reynold Xin|   CA|
+-------------+-----+



In [39]:
rows

[<Row('Matei Zaharia', 'CA')>, <Row('Reynold Xin', 'CA')>]

In [40]:
rows[0]

<Row('Matei Zaharia', 'CA')>

In [41]:
rows[0][0]

'Matei Zaharia'

In [42]:
# In Python
df = spark.range(0, 10000, 1, 8) # this code will create a DataFrame of 10,000 integers distributed over eight parti‐
# tions in memory 
print(df.rdd.getNumPartitions()) # it should print 8

8


### Common DataFrame Operations

In [43]:
# In Python, define a schema
from pyspark.sql.types import *
# Programmatic way to define a schema
fire_schema = StructType(
    [StructField('CallNumber', IntegerType(), True),
    StructField('UnitID', StringType(), True),
    StructField('IncidentNumber', IntegerType(), True),
    StructField('CallType', StringType(), True),
    StructField('CallDate', StringType(), True),
    StructField('WatchDate', StringType(), True),
    StructField('CallFinalDisposition', StringType(), True),
    StructField('AvailableDtTm', StringType(), True),
    StructField('Address', StringType(), True),
    StructField('City', StringType(), True),
    StructField('Zipcode', IntegerType(), True),
    StructField('Battalion', StringType(), True),
    StructField('StationArea', StringType(), True),
    StructField('Box', StringType(), True),
    StructField('OriginalPriority', StringType(), True),
    StructField('Priority', StringType(), True),
    StructField('FinalPriority', IntegerType(), True),
    StructField('ALSUnit', BooleanType(), True),
    StructField('CallTypeGroup', StringType(), True),
    StructField('NumAlarms', IntegerType(), True),
    StructField('UnitType', StringType(), True),
    StructField('UnitSequenceInCallDispatch', IntegerType(), True),
    StructField('FirePreventionDistrict', StringType(), True),
    StructField('SupervisorDistrict', StringType(), True),
    StructField('Neighborhood', StringType(), True),
    StructField('Location', StringType(), True),
    StructField('RowID', StringType(), True),
    StructField('Delay', FloatType(), True)]
)

# Use the DataFrameReader interface to read a CSV file
sf_fire_file = "sf-fire-calls.csv"
fire_df = spark.read.csv(sf_fire_file, header=True, schema=fire_schema)

In [44]:
print(fire_df.take(1))

[Row(CallNumber=20110016, UnitID='T13', IncidentNumber=2003235, CallType='Structure Fire', CallDate='01/11/2002', WatchDate='01/10/2002', CallFinalDisposition='Other', AvailableDtTm='01/11/2002 01:51:44 AM', Address='2000 Block of CALIFORNIA ST', City='SF', Zipcode=94109, Battalion='B04', StationArea='38', Box='3362', OriginalPriority='3', Priority='3', FinalPriority=3, ALSUnit=False, CallTypeGroup=None, NumAlarms=1, UnitType='TRUCK', UnitSequenceInCallDispatch=2, FirePreventionDistrict='4', SupervisorDistrict='5', Neighborhood='Pacific Heights', Location='(37.7895840679362, -122.428071912459)', RowID='020110016-T13', Delay=2.950000047683716)]


In [45]:
fire_df.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 

In [46]:
print(fire_df.show(5, truncate=False))

+----------+------+--------------+----------------+----------+----------+--------------------+----------------------+---------------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+---------------------+-------------------------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|CallType        |CallDate  |WatchDate |CallFinalDisposition|AvailableDtTm         |Address                    |City|Zipcode|Battalion|StationArea|Box |OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|Neighborhood         |Location                             |RowID        |Delay    |
+----------+------+--------------+----------------+----------+----------+--------------------+----------------------+---------------------------+----+-------+--

In [47]:
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [48]:
fire_df.take(1)

[Row(CallNumber=20110016, UnitID='T13', IncidentNumber=2003235, CallType='Structure Fire', CallDate='01/11/2002', WatchDate='01/10/2002', CallFinalDisposition='Other', AvailableDtTm='01/11/2002 01:51:44 AM', Address='2000 Block of CALIFORNIA ST', City='SF', Zipcode=94109, Battalion='B04', StationArea='38', Box='3362', OriginalPriority='3', Priority='3', FinalPriority=3, ALSUnit=False, CallTypeGroup=None, NumAlarms=1, UnitType='TRUCK', UnitSequenceInCallDispatch=2, FirePreventionDistrict='4', SupervisorDistrict='5', Neighborhood='Pacific Heights', Location='(37.7895840679362, -122.428071912459)', RowID='020110016-T13', Delay=2.950000047683716)]

In [49]:
fire_df.show(2)

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+-----+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+------

In [50]:
fire_df.show(2, truncate=False)

+----------+------+--------------+----------------+----------+----------+--------------------+----------------------+---------------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+---------------------+-------------------------------------+-------------+-----+
|CallNumber|UnitID|IncidentNumber|CallType        |CallDate  |WatchDate |CallFinalDisposition|AvailableDtTm         |Address                    |City|Zipcode|Battalion|StationArea|Box |OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|Neighborhood         |Location                             |RowID        |Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+----------------------+---------------------------+----+-------+---------+

In [51]:
%%html
<style>
div.output_area pre {
    white-space: pre;
}

In [52]:
fire_df.show(2, truncate=True)

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+-----+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+------

In [53]:
fire_df.show(2, truncate=False)

+----------+------+--------------+----------------+----------+----------+--------------------+----------------------+---------------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+---------------------+-------------------------------------+-------------+-----+
|CallNumber|UnitID|IncidentNumber|CallType        |CallDate  |WatchDate |CallFinalDisposition|AvailableDtTm         |Address                    |City|Zipcode|Battalion|StationArea|Box |OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|Neighborhood         |Location                             |RowID        |Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+----------------------+---------------------------+----+-------+---------+

In [54]:
!python --version

Python 3.10.12


In [55]:
!java --version

java 17.0.9 2023-10-17 LTS
Java(TM) SE Runtime Environment (build 17.0.9+11-LTS-201)
Java HotSpot(TM) 64-Bit Server VM (build 17.0.9+11-LTS-201, mixed mode, sharing)


In [56]:
# In Python to save as a Parquet file
parquet_path = "written_data/parquet_file_1"
fire_df.write.mode("overwrite").format("parquet").save(parquet_path) # parquet file will be written inside written_data/parquet_file_1 folder

fire_df.write.mode("overwrite").format("parquet").save("written_data/parquet_file_2") # parquet file will be written inside written_data/parquet_file_2 folder

# Another way
fire_df.write.mode("overwrite").parquet("written_data/parquet_file_3") # parquet file will be written inside written_data/parquet_file_3 folder

In [57]:
# In Python to save as a csv file
csv_path = "written_data/csv_file_1"
fire_df.write.mode("overwrite").format("csv").save(csv_path) # csv file will be written inside written_data/csv_file_1 folder

fire_df.write.mode("overwrite").format("csv").save("written_data/csv_file_2") # csv file will be written inside written_data/csv_file_2 folder

# Another way
fire_df.write.mode("overwrite").csv("written_data/csv_file_3") # csv file will be written inside written_data/csv_file_3 folder

In [58]:
# In Python Writing to SQL table named parquet_table
# 
# parquet_table = parquet_table # name of the table
# fire_df.write.format("parquet").saveAsTable(parquet_table)

In [59]:
# In Python
few_fire_df = (fire_df
.select("IncidentNumber", "AvailableDtTm", "CallType")
.where(col("CallType") != "Medical Incident"))
few_fire_df.show(5, truncate=False)

+--------------+----------------------+--------------+
|IncidentNumber|AvailableDtTm         |CallType      |
+--------------+----------------------+--------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire|
|2003250       |01/11/2002 04:16:46 AM|Vehicle Fire  |
|2003259       |01/11/2002 06:01:58 AM|Alarms        |
|2003279       |01/11/2002 08:03:26 AM|Structure Fire|
|2003301       |01/11/2002 09:46:44 AM|Alarms        |
+--------------+----------------------+--------------+
only showing top 5 rows



In [60]:
# In Python, return number of distinct types of calls using countDistinct()
from pyspark.sql.functions import *
(fire_df.select("CallType")
.where(col("CallType").isNotNull())
.agg(countDistinct("CallType").alias("DistinctCallTypes"))
.show())

+-----------------+
|DistinctCallTypes|
+-----------------+
|               30|
+-----------------+



In [61]:
# In Python, filter for only distinct non-null CallTypes from all the rows
(fire_df
.select("CallType")
.where(col("CallType").isNotNull())
.distinct()
.show(10, False))

+-----------------------------------+
|CallType                           |
+-----------------------------------+
|Elevator / Escalator Rescue        |
|Marine Fire                        |
|Aircraft Emergency                 |
|Confined Space / Structure Collapse|
|Administrative                     |
|Alarms                             |
|Odor (Strange / Unknown)           |
|Citizen Assist / Service Call      |
|HazMat                             |
|Watercraft in Distress             |
+-----------------------------------+
only showing top 10 rows



#### Renaming, adding, and dropping columns.

In [62]:
# In Python
new_fire_df = fire_df.withColumnRenamed("Delay", "ResponseDelayedinMins")
(new_fire_df
.select("ResponseDelayedinMins")
.where(col("ResponseDelayedinMins") > 5)
.show(5, False))

+---------------------+
|ResponseDelayedinMins|
+---------------------+
|5.35                 |
|6.25                 |
|5.2                  |
|5.6                  |
|7.25                 |
+---------------------+
only showing top 5 rows



In [63]:
# In Python
fire_ts_df = (new_fire_df
    .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy"))
    .drop("CallDate")
    .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy"))
    .drop("WatchDate")
    .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a"))
    .drop("AvailableDtTm"))

In [64]:
# Select the converted columns
(fire_ts_df
    .select("IncidentDate", "OnWatchDate", "AvailableDtTS")
    .show(5, False))

+-------------------+-------------------+-------------------+
|IncidentDate       |OnWatchDate        |AvailableDtTS      |
+-------------------+-------------------+-------------------+
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:51:44|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 03:01:18|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 02:39:50|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 04:16:46|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 06:01:58|
+-------------------+-------------------+-------------------+
only showing top 5 rows



In [65]:
(fire_ts_df
    .select(year('IncidentDate'))
    .distinct()
    .orderBy(year('IncidentDate'))
    .show())

+------------------+
|year(IncidentDate)|
+------------------+
|              2000|
|              2001|
|              2002|
|              2003|
|              2004|
|              2005|
|              2006|
|              2007|
|              2008|
|              2009|
|              2010|
|              2011|
|              2012|
|              2013|
|              2014|
|              2015|
|              2016|
|              2017|
|              2018|
+------------------+



In [66]:
(fire_ts_df.printSchema())

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 |-- SupervisorDistrict: string (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- Location: string (nullable =

#### Aggregations

In [67]:
(fire_ts_df
.select(col("CallType"), col("CallType"))# , count(col("CallType")))
.where(col("CallType").isNotNull())
.groupBy("CallType")
.count()
.orderBy("count", ascending=False)
.show(n=10, truncate=False))

+-------------------------------+------+
|CallType                       |count |
+-------------------------------+------+
|Medical Incident               |113794|
|Structure Fire                 |23319 |
|Alarms                         |19406 |
|Traffic Collision              |7013  |
|Citizen Assist / Service Call  |2524  |
|Other                          |2166  |
|Outside Fire                   |2094  |
|Vehicle Fire                   |854   |
|Gas Leak (Natural and LP Gases)|764   |
|Water Rescue                   |755   |
+-------------------------------+------+
only showing top 10 rows



In [68]:
(fire_ts_df
.createOrReplaceTempView("fire_ts_df_tbl")
)

In [69]:
print(fire_ts_df.columns)

['CallNumber', 'UnitID', 'IncidentNumber', 'CallType', 'CallFinalDisposition', 'Address', 'City', 'Zipcode', 'Battalion', 'StationArea', 'Box', 'OriginalPriority', 'Priority', 'FinalPriority', 'ALSUnit', 'CallTypeGroup', 'NumAlarms', 'UnitType', 'UnitSequenceInCallDispatch', 'FirePreventionDistrict', 'SupervisorDistrict', 'Neighborhood', 'Location', 'RowID', 'ResponseDelayedinMins', 'IncidentDate', 'OnWatchDate', 'AvailableDtTS']


In [70]:
(spark.sql
("SELECT * FROM FIRE_TS_DF_TBL limit 10")
).show()

+----------+------+--------------+----------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------------------+-------------------+-------------------+-------------------+
|CallNumber|UnitID|IncidentNumber|        CallType|CallFinalDisposition|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|ResponseDelayedinMins|       IncidentDate|        OnWatchDate|      AvailableDtTS|
+----------+------+--------------+----------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+------

In [71]:
(spark.sql('SELECT YEAR(IncidentDate) AS YEAR, count(IncidentNumber) AS CNT FROM FIRE_TS_DF_TBL GROUP BY YEAR(IncidentDate) ORDER BY CNT DESC limit 5')).show()

+----+-----+
|YEAR|  CNT|
+----+-----+
|2017|12135|
|2016|11609|
|2015|11458|
|2014|10775|
|2018|10136|
+----+-----+



In [72]:
(spark.sql('SELECT TRIM(TO_DATE(IncidentDate, "YYYY")) YEAR, count(IncidentNumber) CNT FROM FIRE_TS_DF_TBL GROUP BY TRIM(TO_DATE(IncidentDate, "YYYY")) ORDER BY CNT LIMIT 5')).show()

+----------+---+
|      YEAR|CNT|
+----------+---+
|2000-04-12|  2|
|2001-11-25|  5|
|2006-10-15|  7|
|2001-08-26|  7|
|2007-02-07|  8|
+----------+---+



In [73]:
(spark.sql
("SELECT distinct(IncidentNumber) FROM FIRE_TS_DF_TBL limit 10")
).show()

+--------------+
|IncidentNumber|
+--------------+
|       2004619|
|       2006963|
|       2007184|
|       2007484|
|       2008576|
|       2014509|
|       2019031|
|       2026677|
|       2026940|
|       2027275|
+--------------+



In [74]:
(spark.sql
("SELECT distinct(CALLNUMBER) FROM FIRE_TS_DF_TBL limit 10")
).show()

+----------+
|CALLNUMBER|
+----------+
|  20140023|
|  20210263|
|  20380045|
|  20570327|
|  20860368|
|  20900223|
|  21240138|
|  21250076|
|  21390387|
|  21410339|
+----------+



In [75]:
# df.groupBy(“col1”).agg({“col2”: “avg”}).orderBy(“col1”)

In [76]:
(fire_ts_df
.select("CallType")
.where(col("CallType").isNotNull())
.groupBy("CallType")
.count()
.orderBy("count", ascending=False)
.show(n=5, truncate=False))

+-----------------------------+------+
|CallType                     |count |
+-----------------------------+------+
|Medical Incident             |113794|
|Structure Fire               |23319 |
|Alarms                       |19406 |
|Traffic Collision            |7013  |
|Citizen Assist / Service Call|2524  |
+-----------------------------+------+
only showing top 5 rows



In [77]:
(fire_ts_df
    # .select("CallType")
    .where(col("CallType").isNotNull())
    .groupBy("CallType")
    .count()
    .orderBy("count", ascending=False)
.show(n=5, truncate=False))

+-----------------------------+------+
|CallType                     |count |
+-----------------------------+------+
|Medical Incident             |113794|
|Structure Fire               |23319 |
|Alarms                       |19406 |
|Traffic Collision            |7013  |
|Citizen Assist / Service Call|2524  |
+-----------------------------+------+
only showing top 5 rows



In [78]:
(fire_ts_df
    .groupBy(year("IncidentDate"))
    .count()
).show(4)

+------------------+-----+
|year(IncidentDate)|count|
+------------------+-----+
|              2003| 8499|
|              2007| 8255|
|              2018|10136|
|              2015|11458|
+------------------+-----+
only showing top 4 rows



In [79]:
(fire_ts_df
    .groupBy(year("IncidentDate").alias("Year"), "CallType")
    .count().alias("Count of incidents")
     .orderBy("year", ascending = False)
).show(4)

+----+--------------------+-----+
|Year|            CallType|count|
+----+--------------------+-----+
|2018|Citizen Assist / ...|  113|
|2018|       Assist Police|    1|
|2018|          Fuel Spill|   10|
|2018|    Medical Incident| 7004|
+----+--------------------+-----+
only showing top 4 rows



In [80]:
(fire_ts_df
 .select("IncidentDate", "CallType")
 .groupBy(year("IncidentDate").alias("Year"), "incidentdate")
    .count()
    .orderBy("count", ascending=False)
).show(3)

+----+-------------------+-----+
|Year|       incidentdate|count|
+----+-------------------+-----+
|2017|2017-09-01 00:00:00|   75|
|2017|2017-09-02 00:00:00|   66|
|2018|2018-01-01 00:00:00|   58|
+----+-------------------+-----+
only showing top 3 rows



In [81]:
fire_ts_df.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 |-- SupervisorDistrict: string (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- Location: string (nullable =

In [82]:
# df.groupBy(“col1”).agg({“col2”: “avg”}).orderBy(“col1”)

In [83]:
(
    fire_ts_df
    .select(avg("NumAlarms"))
    # .avg()
).show(3)

+------------------+
|    avg(NumAlarms)|
+------------------+
|1.0049858525009128|
+------------------+



In [84]:
(
    fire_ts_df
    .select(count("NumAlarms"))
    # .avg()
).show(3)

+----------------+
|count(NumAlarms)|
+----------------+
|          175296|
+----------------+



In [85]:
(
    fire_ts_df
    .where(col("NumAlarms").isNotNull())
    .groupBy("NumAlarms")
    .count()
).show(3)

+---------+------+
|NumAlarms| count|
+---------+------+
|        1|174718|
|        3|   121|
|        5|    17|
+---------+------+
only showing top 3 rows



In [86]:
(
    fire_ts_df
    .select(count("*"))
    # .avg()
).show(3)

+--------+
|count(1)|
+--------+
|  175296|
+--------+



In [87]:
(
    fire_ts_df
    .groupBy(day("IncidentDate"))
    .count()
).show(3)

+-----------------+-----+
|day(IncidentDate)|count|
+-----------------+-----+
|               31| 3360|
|               28| 5732|
|               26| 5491|
+-----------------+-----+
only showing top 3 rows



In [88]:
(fire_ts_df
.groupBy(year("IncidentDate"))
# .mean()
 .count()
.orderBy("count", ascending=False)
).show(3)

+------------------+-----+
|year(IncidentDate)|count|
+------------------+-----+
|              2017|12135|
|              2016|11609|
|              2015|11458|
+------------------+-----+
only showing top 3 rows



In [89]:
(fire_ts_df
.groupBy(day("IncidentDate"))
# .mean()
 .count()
.orderBy("count", ascending=False)
).show(3)

+-----------------+-----+
|day(IncidentDate)|count|
+-----------------+-----+
|                1| 6233|
|                4| 6048|
|                5| 5931|
+-----------------+-----+
only showing top 3 rows



In [90]:
# group by date group by call type - see average callnumber on that date of call type

In [91]:
# see year wise average number of calls

In [92]:
# AVG
df = spark.sql("""SELECT ACCT,AMT,TXN_DT FROM VALUES 
(101,10.01, DATE'2021-01-01'),
(101,102.01, DATE'2021-01-01'),
(102,93., DATE'2021-01-01'),
(103,913.1, DATE'2021-01-02'),
(102,913.1, DATE'2021-01-02'),
(101,900.56, DATE'2021-01-03')
AS TXN(ACCT,AMT,TXN_DT) """)

In [93]:
df.show(1)

+----+-----+----------+
|ACCT|  AMT|    TXN_DT|
+----+-----+----------+
| 101|10.01|2021-01-01|
+----+-----+----------+
only showing top 1 row



In [94]:
print(fire_ts_df.columns)

['CallNumber', 'UnitID', 'IncidentNumber', 'CallType', 'CallFinalDisposition', 'Address', 'City', 'Zipcode', 'Battalion', 'StationArea', 'Box', 'OriginalPriority', 'Priority', 'FinalPriority', 'ALSUnit', 'CallTypeGroup', 'NumAlarms', 'UnitType', 'UnitSequenceInCallDispatch', 'FirePreventionDistrict', 'SupervisorDistrict', 'Neighborhood', 'Location', 'RowID', 'ResponseDelayedinMins', 'IncidentDate', 'OnWatchDate', 'AvailableDtTS']


In [95]:
df.groupBy("TXN_DT").agg(avg("AMT").alias("AVG_AMT")).show(1)

+----------+---------+
|    TXN_DT|  AVG_AMT|
+----------+---------+
|2021-01-01|68.340000|
+----------+---------+
only showing top 1 row



In [96]:
(
    fire_ts_df
    .groupBy(day("IncidentDate"))
    .agg(sum("NumAlarms"))
).show(3)

+-----------------+--------------+
|day(IncidentDate)|sum(NumAlarms)|
+-----------------+--------------+
|               31|          3400|
|               28|          5781|
|               26|          5529|
+-----------------+--------------+
only showing top 3 rows



In [97]:
(
    fire_ts_df
    .groupBy(day("IncidentDate"))
    .agg(avg("NumAlarms"))
).show(3)

+-----------------+------------------+
|day(IncidentDate)|    avg(NumAlarms)|
+-----------------+------------------+
|               31|1.0119047619047619|
|               28|1.0085484996510816|
|               26|1.0069204152249136|
+-----------------+------------------+
only showing top 3 rows



In [98]:
(
    fire_ts_df
    .groupBy(day("IncidentDate"))
    .agg(count(day("IncidentDate")))
).show(3)

+-----------------+------------------------+
|day(IncidentDate)|count(day(IncidentDate))|
+-----------------+------------------------+
|               31|                    3360|
|               28|                    5732|
|               26|                    5491|
+-----------------+------------------------+
only showing top 3 rows



In [99]:
3400/3360

1.0119047619047619

In [100]:
(
    (
    fire_ts_df
    .groupBy(day("IncidentDate"))
    .mean("NumAlarms")
    )
.show(3)
)

+-----------------+------------------+
|day(IncidentDate)|    avg(NumAlarms)|
+-----------------+------------------+
|               31|1.0119047619047619|
|               28|1.0085484996510816|
|               26|1.0069204152249136|
+-----------------+------------------+
only showing top 3 rows



In [101]:
(
    (
    fire_ts_df
    .groupBy(day("IncidentDate"))
    .avg("NumAlarms")
    )
.show(3)
)

+-----------------+------------------+
|day(IncidentDate)|    avg(NumAlarms)|
+-----------------+------------------+
|               31|1.0119047619047619|
|               28|1.0085484996510816|
|               26|1.0069204152249136|
+-----------------+------------------+
only showing top 3 rows



In [102]:
(
    (
    fire_ts_df
    .groupBy(day("IncidentDate"))
    .sum("NumAlarms")
    )
.show(3)
)

+-----------------+--------------+
|day(IncidentDate)|sum(NumAlarms)|
+-----------------+--------------+
|               31|          3400|
|               28|          5781|
|               26|          5529|
+-----------------+--------------+
only showing top 3 rows



#### Other common DataFrame operations.

In [103]:
import pyspark.sql.functions as F
(fire_ts_df
    .select(F.sum("NumAlarms"), F.avg("ResponseDelayedinMins"),
    F.min("ResponseDelayedinMins"), F.max("ResponseDelayedinMins"))
.show())

+--------------+--------------------------+--------------------------+--------------------------+
|sum(NumAlarms)|avg(ResponseDelayedinMins)|min(ResponseDelayedinMins)|max(ResponseDelayedinMins)|
+--------------+--------------------------+--------------------------+--------------------------+
|        176170|         3.892364154521585|               0.016666668|                   1844.55|
+--------------+--------------------------+--------------------------+--------------------------+



> read the
API documentation for methods like stat(), describe(), correlation(),
covariance(), sampleBy(), approxQuantile(), frequentItems(), and so on.

## Typed Objects, Untyped Objects, and Generic Rows

In [104]:
from pyspark.sql import Row
row = Row(350, True, "Learning Spark 2E", None)

In [108]:
for i in range(4):
    print(f"{row[i]} is of type :- {type(row[i])}")

350 is of type :- <class 'int'>
True is of type :- <class 'bool'>
Learning Spark 2E is of type :- <class 'str'>
None is of type :- <class 'NoneType'>


In [120]:
mnm_df = (spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("mnm_dataset.csv"))
count_mnm_df = (mnm_df
.select("State", "Color", "Count")
.groupBy("State", "Color")
.agg(count("Count")
.alias("Total"))
.orderBy("Total", ascending=False))
count_mnm_df.show(10)

+-----+------+-----+
|State| Color|Total|
+-----+------+-----+
|   CA|Yellow| 1807|
|   WA| Green| 1779|
|   OR|Orange| 1743|
|   TX| Green| 1737|
|   TX|   Red| 1725|
|   CA| Green| 1723|
|   CO|Yellow| 1721|
|   CA| Brown| 1718|
|   CO| Green| 1713|
|   NV|Orange| 1712|
+-----+------+-----+
only showing top 10 rows



In [121]:
count_mnm_df.explain(True)

== Parsed Logical Plan ==
'Sort ['Total DESC NULLS LAST], true
+- Aggregate [State#2988, Color#2989], [State#2988, Color#2989, count(Count#2990) AS Total#3002L]
   +- Project [State#2988, Color#2989, Count#2990]
      +- Relation [State#2988,Color#2989,Count#2990] csv

== Analyzed Logical Plan ==
State: string, Color: string, Total: bigint
Sort [Total#3002L DESC NULLS LAST], true
+- Aggregate [State#2988, Color#2989], [State#2988, Color#2989, count(Count#2990) AS Total#3002L]
   +- Project [State#2988, Color#2989, Count#2990]
      +- Relation [State#2988,Color#2989,Count#2990] csv

== Optimized Logical Plan ==
Sort [Total#3002L DESC NULLS LAST], true
+- Aggregate [State#2988, Color#2989], [State#2988, Color#2989, count(Count#2990) AS Total#3002L]
   +- Relation [State#2988,Color#2989,Count#2990] csv

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [Total#3002L DESC NULLS LAST], true, 0
   +- Exchange rangepartitioning(Total#3002L DESC NULLS LAST, 200), ENSURE_REQUIREME