# Spark Data Wrangling

In [6]:
import pandas as pd

import pyspark
from pyspark.sql.functions import * 

from env import conn

## Acquire

In [7]:
#create enviroment
spark = pyspark.sql.SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/10/21 11:23:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### load mpg data set from pydataset

In [8]:
from pydataset import data

In [9]:
mpg = spark.createDataFrame(data('mpg'))

In [10]:
mpg.show(5)

[Stage 0:>                                                          (0 + 1) / 1]

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



                                                                                

### write datafame to file

- `json`: for writing to a local json file(s)
- `csv`: for writing to a local csv file(s)
- `parquet`: Parquet is a very popular columnar storage format for Hadoop.
- `jdbc`: for writing to a SQL database table

#### write file to json

In [11]:
#df.write.type
mpg.write.json('data/mpg_json')

                                                                                

In [12]:
import os

In [14]:
json_files = os.listdir('data/mpg_json/')
json_files

['part-00008-3b747b95-14d0-46d0-8b16-6f546a652234-c000.json',
 'part-00005-3b747b95-14d0-46d0-8b16-6f546a652234-c000.json',
 '.part-00002-3b747b95-14d0-46d0-8b16-6f546a652234-c000.json.crc',
 'part-00000-3b747b95-14d0-46d0-8b16-6f546a652234-c000.json',
 '.part-00008-3b747b95-14d0-46d0-8b16-6f546a652234-c000.json.crc',
 '.part-00001-3b747b95-14d0-46d0-8b16-6f546a652234-c000.json.crc',
 'part-00002-3b747b95-14d0-46d0-8b16-6f546a652234-c000.json',
 '._SUCCESS.crc',
 '.part-00006-3b747b95-14d0-46d0-8b16-6f546a652234-c000.json.crc',
 'part-00007-3b747b95-14d0-46d0-8b16-6f546a652234-c000.json',
 '.part-00005-3b747b95-14d0-46d0-8b16-6f546a652234-c000.json.crc',
 'part-00006-3b747b95-14d0-46d0-8b16-6f546a652234-c000.json',
 '.part-00003-3b747b95-14d0-46d0-8b16-6f546a652234-c000.json.crc',
 '.part-00009-3b747b95-14d0-46d0-8b16-6f546a652234-c000.json.crc',
 'part-00003-3b747b95-14d0-46d0-8b16-6f546a652234-c000.json',
 '.part-00000-3b747b95-14d0-46d0-8b16-6f546a652234-c000.json.crc',
 '_SUCCESS',

In [16]:
json_files = [fn for fn in json_files if not fn.startswith('.')]

#### write dataframe to csv

In [32]:
#df.write.format()
(
    mpg.write.format('csv')
    .option('header', 'True')
    .mode('overwrite')
    .save('data/mpg_csv')
)

                                                                                

In [33]:
csv_files = os.listdir('data/mpg_csv')

In [34]:
csv_files = [fn for fn in csv_files if not fn.startswith('.')]
csv_files

['part-00002-12675834-c33b-452d-9235-9123f1b86cd2-c000.csv',
 'part-00003-12675834-c33b-452d-9235-9123f1b86cd2-c000.csv',
 'part-00000-12675834-c33b-452d-9235-9123f1b86cd2-c000.csv',
 'part-00001-12675834-c33b-452d-9235-9123f1b86cd2-c000.csv',
 'part-00006-12675834-c33b-452d-9235-9123f1b86cd2-c000.csv',
 'part-00007-12675834-c33b-452d-9235-9123f1b86cd2-c000.csv',
 'part-00008-12675834-c33b-452d-9235-9123f1b86cd2-c000.csv',
 '_SUCCESS',
 'part-00004-12675834-c33b-452d-9235-9123f1b86cd2-c000.csv',
 'part-00009-12675834-c33b-452d-9235-9123f1b86cd2-c000.csv',
 'part-00005-12675834-c33b-452d-9235-9123f1b86cd2-c000.csv']

### read files
- spark.read.[type]

#### read json

In [22]:
spark.read.json('data/mpg_json').show(1)

                                                                                

+-------+---+---+-----+---+---+---+------------+-----+----------+----+
|  class|cty|cyl|displ|drv| fl|hwy|manufacturer|model|     trans|year|
+-------+---+---+-----+---+---+---+------------+-----+----------+----+
|compact| 21|  4|  2.0|  f|  r| 29|  volkswagen|  gti|manual(m5)|1999|
+-------+---+---+-----+---+---+---+------------+-----+----------+----+
only showing top 1 row



In [23]:
spark.read.json('data/mpg_json').count()

234

In [26]:
spark.read.json('data/mpg_json').show(10)

+-------+---+---+-----+---+---+---+------------+-----+----------+----+
|  class|cty|cyl|displ|drv| fl|hwy|manufacturer|model|     trans|year|
+-------+---+---+-----+---+---+---+------------+-----+----------+----+
|compact| 21|  4|  2.0|  f|  r| 29|  volkswagen|  gti|manual(m5)|1999|
|compact| 19|  4|  2.0|  f|  r| 26|  volkswagen|  gti|  auto(l4)|1999|
|compact| 21|  4|  2.0|  f|  p| 29|  volkswagen|  gti|manual(m6)|2008|
|compact| 22|  4|  2.0|  f|  p| 29|  volkswagen|  gti|  auto(s6)|2008|
|compact| 17|  6|  2.8|  f|  r| 24|  volkswagen|  gti|manual(m5)|1999|
|compact| 33|  4|  1.9|  f|  d| 44|  volkswagen|jetta|manual(m5)|1999|
|compact| 21|  4|  2.0|  f|  r| 29|  volkswagen|jetta|manual(m5)|1999|
|compact| 19|  4|  2.0|  f|  r| 26|  volkswagen|jetta|  auto(l4)|1999|
|compact| 22|  4|  2.0|  f|  p| 29|  volkswagen|jetta|  auto(s6)|2008|
|compact| 21|  4|  2.0|  f|  p| 29|  volkswagen|jetta|manual(m6)|2008|
+-------+---+---+-----+---+---+---+------------+-----+----------+----+
only s

#### read csv

In [35]:
spark.read.csv('data/mpg_csv').count()

244

In [36]:
# why are there 10 more lines??
spark.read.csv('data/mpg_csv').show(10)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|         _c0|  _c1|  _c2| _c3|_c4|       _c5|_c6|_c7|_c8|_c9|   _c10|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
|  volkswagen|  gti|  2.0|1999|  4|manual(m5)|  f| 21| 29|  r|compact|
|  volkswagen|  gti|  2.0|1999|  4|  auto(l4)|  f| 19| 26|  r|compact|
|  volkswagen|  gti|  2.0|2008|  4|manual(m6)|  f| 21| 29|  p|compact|
|  volkswagen|  gti|  2.0|2008|  4|  auto(s6)|  f| 22| 29|  p|compact|
|  volkswagen|  gti|  2.8|1999|  6|manual(m5)|  f| 17| 24|  r|compact|
|  volkswagen|jetta|  1.9|1999|  4|manual(m5)|  f| 33| 44|  d|compact|
|  volkswagen|jetta|  2.0|1999|  4|manual(m5)|  f| 21| 29|  r|compact|
|  volkswagen|jetta|  2.0|1999|  4|  auto(l4)|  f| 19| 26|  r|compact|
|  volkswagen|jetta|  2.0|2008|  4|  auto(s6)|  f| 22| 29|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only s

In [41]:
(
    spark.read.format('csv')
    .option('header', True)
    .load('data/mpg_csv')
).show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|  volkswagen|  gti|  2.0|1999|  4|manual(m5)|  f| 21| 29|  r|compact|
|  volkswagen|  gti|  2.0|1999|  4|  auto(l4)|  f| 19| 26|  r|compact|
|  volkswagen|  gti|  2.0|2008|  4|manual(m6)|  f| 21| 29|  p|compact|
|  volkswagen|  gti|  2.0|2008|  4|  auto(s6)|  f| 22| 29|  p|compact|
|  volkswagen|  gti|  2.8|1999|  6|manual(m5)|  f| 17| 24|  r|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



In [42]:
(
    spark.read.format('csv')
    .option('header', True)
    .load('data/mpg_csv')
).count()

234

### load source from 311_data in sql

In [56]:
#sql query
url = conn('311_data')
query = 'SELECT source_id, source_username FROM source'

In [46]:
#make pandas df
pandas_df = pd.read_sql(query, url)

In [47]:
#create spark df
sources = spark.createDataFrame(pandas_df)
sources

DataFrame[source_id: string, source_username: string]

In [48]:
sources.show(5)

+---------+----------------+
|source_id| source_username|
+---------+----------------+
|   100137|Merlene Blodgett|
|   103582|     Carmen Cura|
|   106463| Richard Sanchez|
|   119403|  Betty De Hoyos|
|   119555|  Socorro Quiara|
+---------+----------------+
only showing top 5 rows



### data schema

In [49]:
sources.schema

StructType([StructField('source_id', StringType(), True), StructField('source_username', StringType(), True)])

In [50]:
from pyspark.sql.types import StructType, StructField, StringType

In [52]:
#use structype to build a list of the structfield with the columns assocaited schema
schema = StructType(
    [
        StructField('source_id', StringType()),
        StructField('source_username', StringType())
    ]
)
schema

StructType([StructField('source_id', StringType(), True), StructField('source_username', StringType(), True)])

In [53]:
%%timeit
spark.createDataFrame(pandas_df, schema=schema)

18.9 ms ± 6.06 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [54]:
%%timeit
spark.createDataFrame(pandas_df)

12.4 ms ± 1.17 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)


### load cases from 311_data from sql

In [63]:
#sql query
url = conn('311_data')
q = 'SELECT * FROM cases LIMIT 100000'

In [64]:
#pandas df
pandas_df = pd.read_sql(q, url)

In [65]:
#spark df
df = spark.createDataFrame(pandas_df)
df

DataFrame[case_id: bigint, case_opened_date: string, case_closed_date: string, SLA_due_date: string, case_late: string, num_days_late: double, case_closed: string, dept_division: string, service_request_type: string, SLA_days: double, case_status: string, source_id: string, request_address: string, council_district: bigint]

In [68]:
df.show(1, vertical=True, truncate=False)

22/10/21 11:52:46 WARN TaskSetManager: Stage 41 contains a task of very large size (1653 KiB). The maximum recommended task size is 1000 KiB.
-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 1/1/18 0:42                          
 case_closed_date     | 1/1/18 12:29                         
 SLA_due_date         | 9/26/20 0:42                         
 case_late            | NO                                   
 num_days_late        | -998.5087616                         
 case_closed          | YES                                  
 dept_division        | Field Operations                     
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  EL PASO ST, San Antoni

In [70]:
df.printSchema()

root
 |-- case_id: long (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- SLA_due_date: string (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: long (nullable = true)



## Prepare

- rename columns
- correct datatypes
- data transformation
- make new features
- join tables

### rename columns

In [71]:
df.columns

['case_id',
 'case_opened_date',
 'case_closed_date',
 'SLA_due_date',
 'case_late',
 'num_days_late',
 'case_closed',
 'dept_division',
 'service_request_type',
 'SLA_days',
 'case_status',
 'source_id',
 'request_address',
 'council_district']

#### change SLA_due_date to case_due_date

In [72]:
df = df.withColumnRenamed('SLA_due_date', 'case_due_date')

### correct datatypes

In [74]:
df.show(1, vertical=True, truncate=False)

22/10/21 12:06:37 WARN TaskSetManager: Stage 42 contains a task of very large size (1653 KiB). The maximum recommended task size is 1000 KiB.
-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 1/1/18 0:42                          
 case_closed_date     | 1/1/18 12:29                         
 case_due_date        | 9/26/20 0:42                         
 case_late            | NO                                   
 num_days_late        | -998.5087616                         
 case_closed          | YES                                  
 dept_division        | Field Operations                     
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  EL PASO ST, San Antoni

In [75]:
df.dtypes

[('case_id', 'bigint'),
 ('case_opened_date', 'string'),
 ('case_closed_date', 'string'),
 ('case_due_date', 'string'),
 ('case_late', 'string'),
 ('num_days_late', 'double'),
 ('case_closed', 'string'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'bigint')]

#### change close_closed and case_late columns into boolean values

In [76]:
df.select('case_closed').distinct().show()

22/10/21 12:08:07 WARN TaskSetManager: Stage 43 contains a task of very large size (1653 KiB). The maximum recommended task size is 1000 KiB.


[Stage 43:>                                                       (0 + 10) / 10]

+-----------+
|case_closed|
+-----------+
|        YES|
|         NO|
+-----------+



                                                                                

In [77]:
df.select('case_late').distinct().show()

22/10/21 12:08:31 WARN TaskSetManager: Stage 46 contains a task of very large size (1653 KiB). The maximum recommended task size is 1000 KiB.


[Stage 46:>                                                       (0 + 10) / 10]

+---------+
|case_late|
+---------+
|      YES|
|       NO|
+---------+



                                                                                

In [80]:
#use condition to make true and false
df.withColumn('case_closed',
             expr('case_closed =="YES"')
             ).select('case_closed').distinct().show()

22/10/21 12:10:45 WARN TaskSetManager: Stage 53 contains a task of very large size (1653 KiB). The maximum recommended task size is 1000 KiB.


[Stage 53:>                                                       (0 + 10) / 10]

+-----------+
|case_closed|
+-----------+
|       true|
|      false|
+-----------+



                                                                                

In [82]:
df = df.withColumn('case_closed',
             expr('case_closed =="YES"')
             ).withColumn('case_late',
                         expr('case_late == "YES"'))

In [84]:
df.select('case_closed', 'case_late').distinct().show(5)

22/10/21 12:14:49 WARN TaskSetManager: Stage 57 contains a task of very large size (1653 KiB). The maximum recommended task size is 1000 KiB.


[Stage 57:>                                                       (0 + 10) / 10]

+-----------+---------+
|case_closed|case_late|
+-----------+---------+
|       true|    false|
|       true|     true|
|      false|    false|
|      false|     true|
+-----------+---------+



                                                                                

#### change council_district datatype to string

In [86]:
df.groupby('council_district').count().show()

22/10/21 12:15:50 WARN TaskSetManager: Stage 63 contains a task of very large size (1653 KiB). The maximum recommended task size is 1000 KiB.


[Stage 63:>                                                       (0 + 10) / 10]

+----------------+-----+
|council_district|count|
+----------------+-----+
|               0|  511|
|               7| 8010|
|               6| 8385|
|               9| 4773|
|               5|13404|
|               1|14640|
|              10| 6888|
|               3|11813|
|               8| 4980|
|               2|13619|
|               4|12977|
+----------------+-----+



                                                                                

In [88]:
#use .cast()
df = df.withColumn('council_district',
             col('council_district').cast('string')
             )

#### change dates to datetype

In [90]:
df.select('case_opened_date', 'case_closed_date', 'case_due_date').show(5)

22/10/21 12:18:27 WARN TaskSetManager: Stage 66 contains a task of very large size (1653 KiB). The maximum recommended task size is 1000 KiB.
+----------------+----------------+-------------+
|case_opened_date|case_closed_date|case_due_date|
+----------------+----------------+-------------+
|     1/1/18 0:42|    1/1/18 12:29| 9/26/20 0:42|
|     1/1/18 0:46|     1/3/18 8:11|  1/5/18 8:30|
|     1/1/18 0:48|     1/2/18 7:57|  1/5/18 8:30|
|     1/1/18 1:29|     1/2/18 8:13| 1/17/18 8:30|
|     1/1/18 1:34|    1/1/18 13:29|  1/1/18 4:34|
+----------------+----------------+-------------+
only showing top 5 rows



format date strings: https://docs.oracle.com/javase/10/docs/api/java/time/format/DateTimeFormatter.html

In [93]:
df.withColumn('case_opened_date', to_timestamp('case_opened_date')).select('case_opened_date').show(5)

22/10/21 12:21:08 WARN TaskSetManager: Stage 68 contains a task of very large size (1653 KiB). The maximum recommended task size is 1000 KiB.
+----------------+
|case_opened_date|
+----------------+
|            null|
|            null|
|            null|
|            null|
|            null|
+----------------+
only showing top 5 rows



In [94]:
fmt = 'M/d/yy H:m'
df.withColumn('case_opened_date', to_timestamp('case_opened_date', fmt)).select('case_opened_date').show(5)

22/10/21 12:21:09 WARN TaskSetManager: Stage 69 contains a task of very large size (1653 KiB). The maximum recommended task size is 1000 KiB.
+-------------------+
|   case_opened_date|
+-------------------+
|2018-01-01 00:42:00|
|2018-01-01 00:46:00|
|2018-01-01 00:48:00|
|2018-01-01 01:29:00|
|2018-01-01 01:34:00|
+-------------------+
only showing top 5 rows



In [96]:
#use to_timestamp
df = (
    df.withColumn('case_opened_date', to_timestamp('case_opened_date', fmt))
    .withColumn('case_closed_date', to_timestamp('case_closed_date', fmt))
    .withColumn('case_due_date', to_timestamp('case_due_date', fmt))
)

In [97]:
df.dtypes

[('case_id', 'bigint'),
 ('case_opened_date', 'timestamp'),
 ('case_closed_date', 'timestamp'),
 ('case_due_date', 'timestamp'),
 ('case_late', 'boolean'),
 ('num_days_late', 'double'),
 ('case_closed', 'boolean'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'string')]

### data transformation

#### normalize address
- `lower`: lowercase everything
- `trim`: remove whitespace on the edges 

In [98]:
df.select('request_address').show(5, truncate=False)

22/10/21 12:25:27 WARN TaskSetManager: Stage 70 contains a task of very large size (1653 KiB). The maximum recommended task size is 1000 KiB.
+-------------------------------------+
|request_address                      |
+-------------------------------------+
|2315  EL PASO ST, San Antonio, 78207 |
|2215  GOLIAD RD, San Antonio, 78223  |
|102  PALFREY ST W, San Antonio, 78223|
|114  LA GARDE ST, San Antonio, 78223 |
|734  CLEARVIEW DR, San Antonio, 78228|
+-------------------------------------+
only showing top 5 rows



In [100]:
df.withColumn('request_address',
              trim(lower('request_address'))).select('request_address').show(5)

22/10/21 12:26:50 WARN TaskSetManager: Stage 72 contains a task of very large size (1653 KiB). The maximum recommended task size is 1000 KiB.
+--------------------+
|     request_address|
+--------------------+
|2315  el paso st,...|
|2215  goliad rd, ...|
|102  palfrey st w...|
|114  la garde st,...|
|734  clearview dr...|
+--------------------+
only showing top 5 rows



In [102]:
df = df.withColumn('request_address',
              trim(lower('request_address')))
df.select('request_address').show(5, truncate=False)

22/10/21 12:27:52 WARN TaskSetManager: Stage 74 contains a task of very large size (1653 KiB). The maximum recommended task size is 1000 KiB.
+-------------------------------------+
|request_address                      |
+-------------------------------------+
|2315  el paso st, san antonio, 78207 |
|2215  goliad rd, san antonio, 78223  |
|102  palfrey st w, san antonio, 78223|
|114  la garde st, san antonio, 78223 |
|734  clearview dr, san antonio, 78228|
+-------------------------------------+
only showing top 5 rows



#### change num_days_late to num_weeks_late

In [103]:
df.select('num_days_late').show(5)

22/10/21 13:33:13 WARN TaskSetManager: Stage 75 contains a task of very large size (1653 KiB). The maximum recommended task size is 1000 KiB.
+-------------+
|num_days_late|
+-------------+
| -998.5087616|
| -2.012604167|
| -3.022337963|
| -15.01148148|
|  0.372164352|
+-------------+
only showing top 5 rows



In [105]:
df = df.withColumn('num_weeks_late', expr('num_days_late / 7'))

In [106]:
df.select('num_days_late', 'num_weeks_late').show(5)

22/10/21 13:34:55 WARN TaskSetManager: Stage 77 contains a task of very large size (1653 KiB). The maximum recommended task size is 1000 KiB.
+-------------+--------------------+
|num_days_late|      num_weeks_late|
+-------------+--------------------+
| -998.5087616|        -142.6441088|
| -2.012604167|        -0.287514881|
| -3.022337963|-0.43176256614285713|
| -15.01148148| -2.1444973542857144|
|  0.372164352|0.053166335999999995|
+-------------+--------------------+
only showing top 5 rows



#### change council_district to int and pad with 00s

In [107]:
df.select('council_district').show()

22/10/21 13:37:34 WARN TaskSetManager: Stage 78 contains a task of very large size (1653 KiB). The maximum recommended task size is 1000 KiB.
+----------------+
|council_district|
+----------------+
|               5|
|               3|
|               3|
|               3|
|               7|
|               7|
|               4|
|               4|
|               4|
|               4|
|               4|
|               4|
|               4|
|               4|
|               4|
|               4|
|               4|
|               4|
|               4|
|               4|
+----------------+
only showing top 20 rows



In [110]:
# '%03d' means at least 3 digits, pad with 0s
df.withColumn('council_district',
             format_string('%03d', col('council_district').cast('int'))).select('council_district').show(5)

22/10/21 13:38:43 WARN TaskSetManager: Stage 81 contains a task of very large size (1653 KiB). The maximum recommended task size is 1000 KiB.
+----------------+
|council_district|
+----------------+
|             005|
|             003|
|             003|
|             003|
|             007|
+----------------+
only showing top 5 rows



In [111]:
df.dtypes

[('case_id', 'bigint'),
 ('case_opened_date', 'timestamp'),
 ('case_closed_date', 'timestamp'),
 ('case_due_date', 'timestamp'),
 ('case_late', 'boolean'),
 ('num_days_late', 'double'),
 ('case_closed', 'boolean'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'string'),
 ('num_weeks_late', 'double')]

### new features

In [112]:
df.columns

['case_id',
 'case_opened_date',
 'case_closed_date',
 'case_due_date',
 'case_late',
 'num_days_late',
 'case_closed',
 'dept_division',
 'service_request_type',
 'SLA_days',
 'case_status',
 'source_id',
 'request_address',
 'council_district',
 'num_weeks_late']

#### create zip code column

In [113]:
df.select('request_address').show(5, truncate=False)

22/10/21 13:42:22 WARN TaskSetManager: Stage 82 contains a task of very large size (1653 KiB). The maximum recommended task size is 1000 KiB.
+-------------------------------------+
|request_address                      |
+-------------------------------------+
|2315  el paso st, san antonio, 78207 |
|2215  goliad rd, san antonio, 78223  |
|102  palfrey st w, san antonio, 78223|
|114  la garde st, san antonio, 78223 |
|734  clearview dr, san antonio, 78228|
+-------------------------------------+
only showing top 5 rows



In [121]:
import pyspark.sql.functions as sf

In [125]:
df = df.withColumn('zip_code', sf.regexp_extract('request_address', r'\d+$', 0))

In [126]:
df.show(1, vertical=True)

22/10/21 13:47:05 WARN TaskSetManager: Stage 89 contains a task of very large size (1653 KiB). The maximum recommended task size is 1000 KiB.
-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616         
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 5                    
 num_weeks_late       | -142.6441088         
 zip_code             | 78207                
only showing top 1 row



#### create case_lifetime column

- case_age: how long since the case first opened
- days_to_close: the number of days between days opened and days closed
- case_lifetime: if the case is open, how long since the case opened, if the case is closed, the number of days to close


In [127]:
df.show(1, vertical=True)

22/10/21 13:47:50 WARN TaskSetManager: Stage 90 contains a task of very large size (1653 KiB). The maximum recommended task size is 1000 KiB.
-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616         
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 5                    
 num_weeks_late       | -142.6441088         
 zip_code             | 78207                
only showing top 1 row



In [130]:
df.select(current_timestamp()).show(1, truncate=False)

22/10/21 13:49:38 WARN TaskSetManager: Stage 93 contains a task of very large size (1653 KiB). The maximum recommended task size is 1000 KiB.
+--------------------------+
|current_timestamp()       |
+--------------------------+
|2022-10-21 13:49:38.179276|
+--------------------------+
only showing top 1 row



In [132]:
#use datediff() to find the difference between two dates
df = df.withColumn('case_age',
              datediff(current_timestamp(), 'case_opened_date'))

In [133]:
df.select('case_opened_date', 'case_age').show(5)

22/10/21 13:51:31 WARN TaskSetManager: Stage 95 contains a task of very large size (1653 KiB). The maximum recommended task size is 1000 KiB.
+-------------------+--------+
|   case_opened_date|case_age|
+-------------------+--------+
|2018-01-01 00:42:00|    1754|
|2018-01-01 00:46:00|    1754|
|2018-01-01 00:48:00|    1754|
|2018-01-01 01:29:00|    1754|
|2018-01-01 01:34:00|    1754|
+-------------------+--------+
only showing top 5 rows



In [134]:
#create days_to_close
df = df.withColumn('days_to_close',
              datediff('case_closed_date', 'case_opened_date'))

In [136]:
#look at new columns for cases that were closed
df.select('case_opened_date', 'case_closed_date', 'days_to_close').show(5)

22/10/21 13:52:44 WARN TaskSetManager: Stage 97 contains a task of very large size (1653 KiB). The maximum recommended task size is 1000 KiB.
+-------------------+-------------------+-------------+
|   case_opened_date|   case_closed_date|days_to_close|
+-------------------+-------------------+-------------+
|2018-01-01 00:42:00|2018-01-01 12:29:00|            0|
|2018-01-01 00:46:00|2018-01-03 08:11:00|            2|
|2018-01-01 00:48:00|2018-01-02 07:57:00|            1|
|2018-01-01 01:29:00|2018-01-02 08:13:00|            1|
|2018-01-01 01:34:00|2018-01-01 13:29:00|            0|
+-------------------+-------------------+-------------+
only showing top 5 rows



In [140]:
#look at new columns for cases that were NOT closed
df.select('case_closed',
          'case_opened_date',
          'case_closed_date',
          'days_to_close',
          'case_age'
         ).where('case_closed == false').show(5)

22/10/21 13:54:40 WARN TaskSetManager: Stage 100 contains a task of very large size (1653 KiB). The maximum recommended task size is 1000 KiB.
+-----------+-------------------+----------------+-------------+--------+
|case_closed|   case_opened_date|case_closed_date|days_to_close|case_age|
+-----------+-------------------+----------------+-------------+--------+
|      false|2018-01-02 09:39:00|            null|         null|    1753|
|      false|2018-01-02 10:49:00|            null|         null|    1753|
|      false|2018-01-02 13:45:00|            null|         null|    1753|
|      false|2018-01-02 14:09:00|            null|         null|    1753|
|      false|2018-01-02 14:34:00|            null|         null|    1753|
+-----------+-------------------+----------------+-------------+--------+
only showing top 5 rows



In [143]:
#create case_lifetime column
df = df.withColumn('case_lifetime',
             when(expr('! case_closed'),
                 col('case_age')
                 )
             .otherwise(col('days_to_close'))
             )

In [144]:
df.select('case_closed',
          'case_opened_date',
          'case_closed_date',
          'days_to_close',
          'case_age',
          'case_lifetime'
         ).where('case_closed == false').show(5)

22/10/21 13:57:58 WARN TaskSetManager: Stage 102 contains a task of very large size (1653 KiB). The maximum recommended task size is 1000 KiB.
+-----------+-------------------+----------------+-------------+--------+-------------+
|case_closed|   case_opened_date|case_closed_date|days_to_close|case_age|case_lifetime|
+-----------+-------------------+----------------+-------------+--------+-------------+
|      false|2018-01-02 09:39:00|            null|         null|    1753|         1753|
|      false|2018-01-02 10:49:00|            null|         null|    1753|         1753|
|      false|2018-01-02 13:45:00|            null|         null|    1753|         1753|
|      false|2018-01-02 14:09:00|            null|         null|    1753|         1753|
|      false|2018-01-02 14:34:00|            null|         null|    1753|         1753|
+-----------+-------------------+----------------+-------------+--------+-------------+
only showing top 5 rows



In [145]:
#drop unnecessary columns
df = df.drop('case_age', 'days_to_close')

In [146]:
df.columns

['case_id',
 'case_opened_date',
 'case_closed_date',
 'case_due_date',
 'case_late',
 'num_days_late',
 'case_closed',
 'dept_division',
 'service_request_type',
 'SLA_days',
 'case_status',
 'source_id',
 'request_address',
 'council_district',
 'num_weeks_late',
 'zip_code',
 'case_lifetime']

### join the dept table from sql to our current df

In [147]:
df.select('dept_division').show(5)

22/10/21 13:59:58 WARN TaskSetManager: Stage 103 contains a task of very large size (1653 KiB). The maximum recommended task size is 1000 KiB.
+----------------+
|   dept_division|
+----------------+
|Field Operations|
|     Storm Water|
|     Storm Water|
|Code Enforcement|
|Field Operations|
+----------------+
only showing top 5 rows



In [149]:
#get dept table from sql
q = 'SELECT * FROM dept'
dept = pd.read_sql(q, url)

In [150]:
dept = spark.createDataFrame(dept)
dept

DataFrame[dept_division: string, dept_name: string, standardized_dept_name: string, dept_subject_to_SLA: string]

In [151]:
dept.show(5)

+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|                YES|
|               Brush|Solid Waste Manag...|           Solid Waste|                YES|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|                YES|
|    Code Enforcement|Code Enforcement ...|  DSD/Code Enforcement|                YES|
+--------------------+--------------------+----------------------+-------------------+
only showing top 5 rows



In [153]:
df = df.join(dept,
        'dept_division',
        'left')

In [154]:
df.show(1, vertical=True)

22/10/21 14:02:45 WARN TaskSetManager: Stage 112 contains a task of very large size (1653 KiB). The maximum recommended task size is 1000 KiB.
-RECORD 0--------------------------------------
 dept_division          | Field Operations     
 case_id                | 1014127332           
 case_opened_date       | 2018-01-01 00:42:00  
 case_closed_date       | 2018-01-01 12:29:00  
 case_due_date          | 2020-09-26 00:42:00  
 case_late              | false                
 num_days_late          | -998.5087616         
 case_closed            | true                 
 service_request_type   | Stray Animal         
 SLA_days               | 999.0                
 case_status            | Closed               
 source_id              | svcCRMLS             
 request_address        | 2315  el paso st,... 
 council_district       | 5                    
 num_weeks_late         | -142.6441088         
 zip_code               | 78207                
 case_lifetime          | 0              

In [155]:
df = df.drop('dept_division', 'dept_name')

In [156]:
df.show(1, vertical=True)

22/10/21 14:03:19 WARN TaskSetManager: Stage 118 contains a task of very large size (1653 KiB). The maximum recommended task size is 1000 KiB.
-RECORD 0--------------------------------------
 case_id                | 1014127332           
 case_opened_date       | 2018-01-01 00:42:00  
 case_closed_date       | 2018-01-01 12:29:00  
 case_due_date          | 2020-09-26 00:42:00  
 case_late              | false                
 num_days_late          | -998.5087616         
 case_closed            | true                 
 service_request_type   | Stray Animal         
 SLA_days               | 999.0                
 case_status            | Closed               
 source_id              | svcCRMLS             
 request_address        | 2315  el paso st,... 
 council_district       | 5                    
 num_weeks_late         | -142.6441088         
 zip_code               | 78207                
 case_lifetime          | 0                    
 standardized_dept_name | Animal Care Ser

In [158]:
df = df.withColumn('dept_subject_to_SLA',
             expr('dept_subject_to_SLA == "YES"'))#.select('dept_subject_to_SLA').show(5)

In [159]:
df.dtypes

[('case_id', 'bigint'),
 ('case_opened_date', 'timestamp'),
 ('case_closed_date', 'timestamp'),
 ('case_due_date', 'timestamp'),
 ('case_late', 'boolean'),
 ('num_days_late', 'double'),
 ('case_closed', 'boolean'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'string'),
 ('num_weeks_late', 'double'),
 ('zip_code', 'string'),
 ('case_lifetime', 'int'),
 ('standardized_dept_name', 'string'),
 ('dept_subject_to_SLA', 'boolean')]

### train, validate, test split

- `.randomSplit` to split df

In [160]:
df.count()

22/10/21 14:07:03 WARN TaskSetManager: Stage 130 contains a task of very large size (1653 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

100000

In [162]:
train, test = df.randomSplit([.8,.2], 123)

In [163]:
train.count(), test.count()

22/10/21 14:08:00 WARN TaskSetManager: Stage 139 contains a task of very large size (1653 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

22/10/21 14:08:04 WARN TaskSetManager: Stage 148 contains a task of very large size (1653 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

(80131, 19869)

In [164]:
train, val, test = df.randomSplit([.6,.2,.2], 123)

In [165]:
train.count(), val.count(), test.count()

22/10/21 14:08:41 WARN TaskSetManager: Stage 157 contains a task of very large size (1653 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

22/10/21 14:08:43 WARN TaskSetManager: Stage 166 contains a task of very large size (1653 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

22/10/21 14:08:44 WARN TaskSetManager: Stage 175 contains a task of very large size (1653 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

(60118, 20013, 19869)