# Data Wrangling

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.getOrCreate()

Data comes from calls to San Antonio's 311 call center.

## Reading Data

In [2]:
df = spark.read.csv("source.csv", sep=",", header=True, inferSchema=True)

AnalysisException: Path does not exist: file:/Users/jeffakins/codeup data science/spark/source.csv

The above code could also be written like so:

In [None]:
(
    spark.read.format("csv")
    .option("sep", ",")
    .option("inferSchema", True)
    .option("header", True)
    .load("source.csv")
)

### Data Schemas

Common Spark Data Types:

- `StringType`
- `IntegerType`
- `FloatType`

StuctType and StructField wrap the above.

- `StructType`
    - `StructField`: `StringType`
    - `StructField`: `IntegerType`

In [None]:
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType(
    [
        StructField("source_id", StringType()),
        StructField("source_username", StringType()),
    ]
)

spark.read.csv("source.csv", header=True, schema=schema)

### Writing Data

In [None]:
# for demo purposes
from pydataset import data

mpg = spark.createDataFrame(data("mpg"))

mpg.write.json("mpg_json", mode="overwrite")

# like much else in spark, there's multiple ways we could do this:
(
    mpg.write.format("csv")
    .mode("overwrite")
    .option("header", "true")
    .save("mpg_csv")
)

## Data Preparation

In [None]:
df = spark.read.csv("case.csv", header=True, inferSchema=True)
df.show(2, vertical=True, truncate=False)

### Renaming Columns

In [None]:
df = df.withColumnRenamed("SLA_due_date", "case_due_date")

### Casting Data Types

In [None]:
# demonstrating we only have yes/no in each field
df.groupBy("case_closed", "case_late").count().show()

In [None]:
df = df.withColumn("case_closed", expr('case_closed == "YES"')).withColumn(
    "case_late", expr('case_late == "YES"')
)

df.select("case_closed", "case_late").show(5)

In [None]:
df.groupBy("council_district").count().show()

In [None]:
df = df.withColumn("council_district", col("council_district").cast("string"))

`to_timestamp` for date conversion

[Java's `SimpleDateFormat`][1] for the format string

[1]: https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html

In [None]:
print("--- Before handling dates")
df.select("case_opened_date", "case_closed_date", "case_due_date").show(5)

fmt = "M/d/yy H:mm"
df = (
    df.withColumn("case_opened_date", to_timestamp("case_opened_date", fmt))
    .withColumn("case_closed_date", to_timestamp("case_opened_date", fmt))
    .withColumn("case_due_date", to_timestamp("case_opened_date", fmt))
)

print("--- After")
df.select("case_opened_date", "case_closed_date", "case_due_date").show(5)

### Data Transformations

Normalize the address

In [None]:
print("--- Before")
df.select("request_address").show(5)

df = df.withColumn("request_address", trim(lower(df.request_address)))

print("--- After")
df.select("request_address").show(5)

Number of days to number of weeks

In [None]:
df = df.withColumn(
    "num_weeks_late", expr("num_days_late / 7 AS num_weeks_late")
)

df.select("num_days_late", "num_weeks_late").show(5)

String formatting

In [None]:
df = df.withColumn("council_district", col("council_district").cast("int"))

# '%03d' means at least 3 digits, pad with 0s
#
# In order to use the format_string function the way we are, we'll need to
# convert council_district back to an integer temporarily, but the final output
# will be a string.
df = df.withColumn(
    "council_district",
    format_string("%03d", col("council_district").cast("int")),
)

df.select("council_district").show(5)

### New Features

In [None]:
df = df.withColumn("zipcode", regexp_extract("request_address", r"\d+$", 0))

df.select("zipcode").show(5)

- `case_age`: How old the case is; the difference in days between when the case was opened and the current day
- `days_to_closed`: The number of days between when the case was opened and when it was closed
- `case_lifetime`: Number of days between when the case was opened and when it was closed, if the case is still open, the number of days since the case was opened

In [None]:
df = (
    df.withColumn(
        "case_age", datediff(current_timestamp(), "case_opened_date")
    )
    .withColumn(
        "days_to_closed", datediff("case_closed_date", "case_opened_date")
    )
    .withColumn(
        "case_lifetime",
        when(expr("! case_closed"), col("case_age")).otherwise(
            col("days_to_closed")
        ),
    )
)

df.select(
    "case_closed",
    "case_opened_date",
    "case_closed_date",
    "case_age",
    "days_to_closed",
    "case_lifetime",
).where(expr("case_closed")).show(5)

df.select(
    "case_closed",
    "case_opened_date",
    "case_closed_date",
    "case_age",
    "days_to_closed",
    "case_lifetime",
).where(expr("! case_closed")).show(5)

### Joining Department Data

In [None]:
dept = spark.read.csv("dept.csv", header=True, inferSchema=True)
dept.show(5)

Join using `dept_division`.

In [None]:
df = (
    df
    # left join on dept_division
    .join(dept, "dept_division", "left")
    # drop all the columns except for standardized name, as it has much fewer unique values
    .drop(dept.dept_division)
    .drop(dept.dept_name)
    .drop(df.dept_division)
    .withColumnRenamed("standardized_dept_name", "department")
    # convert to a boolean
    .withColumn("dept_subject_to_SLA", col("dept_subject_to_SLA") == "YES")
)

df.show(2, vertical=True)

## Train Test Split

In [None]:
train, test = df.randomSplit([0.8, 0.2])

In [None]:
train, validate, test = df.randomSplit([0.6, 0.2, 0.2])