# 3. Apache Spark's Structured APIs


Resilient Distributed Data (RDD) is the main data structure in Spark. There are 3 important aspects:
- Dependencies
- Partitions (locality information)
- Computer function: partition => Iterator[T]
  
Locality information is important for efficient processing. Task will be sent to the executor whose access to the partition is closest.

Spark tries to infer data schema. It's good practice to pass schema before reading data for preventing errors or extra steps for schema inference.



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F

spark: SparkSession = SparkSession.builder.appName("AuthorAges").getOrCreate()
spark

In [None]:
data_df = spark.createDataFrame(
    data=[("Broke", 20), ("Denny", 31), ("Jules", 30), ("TD", 35)],
    schema=["name", "age"],
)
avg_df = data_df.groupBy("name").agg(F.avg("age"))
avg_df.show()

`StructType` repesents a `pyspark.sql.types.Row`. Every column type can be represented with `Strcutfield` and collections of these with `StructType` can be used as a schema definition.

In [None]:
schema = types.StructType(
    [
        types.StructField("id", types.IntegerType(), False),
        types.StructField("First", types.StringType(), False),
        types.StructField("Last", types.StringType(), False),
        types.StructField("Url", types.StringType(), False),
        types.StructField("Published", types.StringType(), False),
        types.StructField("Hits", types.IntegerType(), False),
        types.StructField("Campaings", types.ArrayType(types.StringType()), False),
    ]
)
schema.jsonValue()

In [None]:
from pyspark.sql import types
from pyspark.sql import functions as F

schema = types.StructType(
    [
        types.StructField("id", types.IntegerType(), False),
        types.StructField("First", types.StringType(), False),
        types.StructField("Last", types.StringType(), False),
        types.StructField("Url", types.StringType(), False),
        types.StructField("Published", types.StringType(), False),
        types.StructField("Hits", types.IntegerType(), False),
        types.StructField("Campaings", types.ArrayType(types.StringType())),
    ]
)

data = [
    [1, "Jules", "Damji", "https://tinyurl.1", "1/4/2016", 4535, ["twitter", "LinkedIn"]],
    [2, "Brooke", "Wenig", "https://tinyurl.2", "5/5/2018", 8908, ["twitter", "LinkedIn"]],
    [3, "Denny", "Lee", "https://tinyurl.3", "6/7/2019", 7659, ["web", "twitter", "FB", "LinkedIn"]],
    [4, "Tathagata", "Das", "https://tinyurl.4", "5/12/2018", 10568, ["twitter", "FB"]],
    [5, "Matei", "Zaharia", "https://tinyurl.5", "5/14/2014", 40578, ["web", "twitter", "FB", "LinkedIn"]],
    [6, "Reynold", "Xin", "https://tinyurl.6", "3/2/2015", 25568, ["twitter", "LinkedIn"]],
]

blogs_df = spark.createDataFrame(data, schema)
blogs_df.show()

print(blogs_df.printSchema())

blogs_df.select(F.expr("Hits") * 2).show(2)
blogs_df.select(F.col("Hits") * 2).show(2)
blogs_df.select(F.expr("Hits * 2")).show(2)

blogs_df.withColumn("Big Hitters", (F.expr("Hits > 10000"))).show()
print(blogs_df.schema)

```python
import pyspark

spark = pyspark.sql.SparkSession.builder.getOrCreate()
spark.read

```

This attribute returns a `DataFrameReader` object

In [None]:
data_path_csv = "data/sf-fire-calls.csv"

fire_df = spark.read.option("inferSchema", "true").option("samplingRatio", 0.5).csv(data_path_csv, header=True)
fire_df.printSchema(), fire_df.count()

In [None]:
!ls -lah data

In [None]:
data_path_parquet = data_path_csv.replace(".csv", ".parquet")
data_path_parquet_table = data_path_csv.replace(".csv", "_table.parquet")


fire_df.write.format("parquet").save(data_path_parquet, mode="overwrite")
fire_df.write.format("csv").saveAsTable("temp", mode="overwrite")

In [None]:
# fire_df.show(2, vertical=True, truncate=False)
fire_df.select("IncidentNumber", "CallType", "AvailableDtTm").filter(F.col("CallType") != "Medical Incident").show(
    5, truncate=False
)

In [None]:
fire_df.select("CallType").filter(F.col("CallType").isNotNull()).agg(
    F.countDistinct("CallType").alias("DistinctCallTypes")
).show()

In [None]:
fire_df.select("CallType").filter(F.col("CallType").isNotNull()).distinct().show(truncate=False)

In [None]:
fire_df.select("Delay").withColumnRenamed("Delay", "ResponseDelayedinMins").filter(
    F.col("ResponseDelayedinMins") > 5
).describe().show()

In [None]:
fires_ts_df = fire_df.withColumn("IncidentDate", F.to_timestamp(F.col("CallDate"), "MM/dd/yyy"))
fires_ts_df.select(F.year("IncidentDate")).distinct().orderBy(F.year("IncidentDate"), ascending=False).show()

In [None]:
fire_df.select("CallType").filter(F.col("CallType").isNotNull()).groupBy("CallType").count().orderBy(
    "count", ascending=False
).show()

In [None]:
fires_ts_df.filter(F.year("IncidentDate") == "2018").filter(F.col("CallType").contains("Fire")).select(
    "CallType"
).distinct().show()

In [None]:
fires_ts_df.filter(F.year("IncidentDate") == "2018").filter(F.col("CallType").contains("Fire")).select(
    F.month("IncidentDate").alias("month")
).groupBy("month").count().orderBy("count", ascending=False).show()

In [None]:
fires_ts_df.filter(F.year("IncidentDate") == "2018").filter(F.col("CallType").contains("Fire")).groupBy(
    "Neighborhood"
).count().orderBy("count", ascending=False).show()