In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructField, StructType, IntegerType, BooleanType, FloatType
from pyspark.sql.functions import col, countDistinct, to_timestamp, year, max as spark_max, avg as spark_avg, sum as spark_sum, min as spark_min

In [None]:
spark = SparkSession.builder.appName("common_dataframe_operations").getOrCreate()

# Read DataFrame

In [None]:
schema = StructType(
        [
            StructField("CallNumber", IntegerType(), True),
            StructField("UnitID", StringType(), True),
            StructField("IncidentNumber", IntegerType(), True),
            StructField("CallType", StringType(), True),
            StructField("CallDate", StringType(), True),
            StructField("WatchDate", StringType(), True),
            StructField("CallFinalDisposition", StringType(), True),
            StructField("AvailableDtTm", StringType(), True),
            StructField("Address", StringType(), True),
            StructField("City", StringType(), True),
            StructField("Zipcode", IntegerType(), True),
            StructField("Battalion", StringType(), True),
            StructField("StationArea", StringType(), True),
            StructField("Box", StringType(), True),
            StructField("OriginalPriority", StringType(), True),
            StructField("Priority", StringType(), True),
            StructField("FinalPriority", IntegerType(), True),
            StructField("ALSUnit", BooleanType(), True),
            StructField("CallTypeGroup", StringType(), True),
            StructField("NumAlarms", IntegerType(), True),
            StructField("UnitType", StringType(), True),
            StructField("UnitSequenceInCallDispatch", IntegerType(), True),
            StructField("FirePreventionDistrict", StringType(), True),
            StructField("SupervisorDistrict", StringType(), True),
            StructField("Neighborhood", StringType(), True),
            StructField("Location", StringType(), True),
            StructField("RowID", StringType(), True),
            StructField("Delay", FloatType(), True),
        ]
    )

fire_df = spark.read.format("csv").schema(schema).option("header", "true").load("/opt/bitnami/spark/custom_data/chapter3/sf-fire-calls.csv")
fire_df.show()

# Saving DataFrame

In [None]:
fire_df.write.format('parquet').mode("overwrite").option("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false").save("/opt/bitnami/spark/custom_data/chapter3/output/")

# Saving DataFrame and create table

In [None]:
fire_df.write.format('parquet').mode('overwrite').option("path", "/opt/bitnami/spark/custom_data/chapter3/output/").option("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false").saveAsTable("fire_df")

# Transformations and actions

##  Projections and filters

In [None]:
few_fire_df = fire_df.select("IncidentNumber", "AvailableDtTm", "CallType").where(col("CallType")!="Medical Incident")
few_fire_df.show(5, truncate=False)

In [None]:
fire_df.select("CallType").where(col("CallType").isNotNull()).agg(countDistinct('CallType').alias("DistinctCallTypes")).show()

In [None]:
fire_df.select("CallType").where(col("CallType").isNotNull()).distinct().alias("DistinctCallTypes").show(10, truncate=False)

## Renaming, adding and dropping columns

In [None]:
new_fire_df = fire_df.withColumnRenamed("Delay", "ResponseDelayedinMins")
new_fire_df.select("ResponseDelayedinMins").where(col("ResponseDelayedinMins")>5).show(5, truncate=False)

In [None]:
fire_ts_df = (new_fire_df.withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")).drop("CallDate")
.withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy")).drop("WatchDate")
.withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a")).drop("AvailableDtTm"))

fire_ts_df.select("IncidentDate", "OnWatchDate", "AvailableDtTS").show(5, truncate=False)

In [None]:
fire_ts_df.select(year("IncidentDate")).distinct().orderBy(year("IncidentDate")).show()

## Aggregations

In [None]:
fire_ts_df.select("CallType").where(col("CallType").isNotNull()).groupBy("CallType").count().orderBy("count", ascending=False).show(10, truncate=False)

## Other common DataFrame operations

In [None]:
fire_ts_df.select(spark_sum("NumAlarms"), spark_avg("ResponseDelayedinMins"), spark_min("ResponseDelayedinMins"), spark_max("ResponseDelayedinMins")).show()

In [None]:
spark.stop()