# Spark Data Wrangling

In [11]:
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.functions import * 

from env import user, password, host

In [12]:
def get_db_url(db):
    '''input df and output sql connection string'''
    return (f'mysql+pymysql://{user}:{password}@{host}/{db}')

## Acquire

In [14]:
#create enviroment
spark = SparkSession.builder.getOrCreate()
spark

### load mpg data set from pydataset

In [15]:
from pydataset import data

In [16]:
mpg = spark.createDataFrame(data('mpg'))
mpg

DataFrame[manufacturer: string, model: string, displ: double, year: bigint, cyl: bigint, trans: string, drv: string, cty: bigint, hwy: bigint, fl: string, class: string]

In [17]:
mpg.show(5)

[Stage 0:>                                                          (0 + 1) / 1]

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



                                                                                

### write datafame to file

- `json`: for writing to a local json file(s)
- `csv`: for writing to a local csv file(s)
- `parquet`: Parquet is a very popular columnar storage format for Hadoop.
- `jdbc`: for writing to a SQL database table

#### write file to json

In [18]:
#df.write.type
mpg.write.json('data/mpg_json', mode='overwrite', )

                                                                                

In [19]:
import os

#### read json

In [21]:
[fn for fn in os.listdir('data/mpg_json/') if not fn.startswith('.')]

['part-00001-ed6bf062-0d8d-4ce2-b08a-c7234523907a-c000.json',
 'part-00011-ed6bf062-0d8d-4ce2-b08a-c7234523907a-c000.json',
 'part-00009-ed6bf062-0d8d-4ce2-b08a-c7234523907a-c000.json',
 'part-00004-ed6bf062-0d8d-4ce2-b08a-c7234523907a-c000.json',
 'part-00006-ed6bf062-0d8d-4ce2-b08a-c7234523907a-c000.json',
 'part-00003-ed6bf062-0d8d-4ce2-b08a-c7234523907a-c000.json',
 'part-00002-ed6bf062-0d8d-4ce2-b08a-c7234523907a-c000.json',
 'part-00007-ed6bf062-0d8d-4ce2-b08a-c7234523907a-c000.json',
 '_SUCCESS',
 'part-00005-ed6bf062-0d8d-4ce2-b08a-c7234523907a-c000.json',
 'part-00008-ed6bf062-0d8d-4ce2-b08a-c7234523907a-c000.json',
 'part-00010-ed6bf062-0d8d-4ce2-b08a-c7234523907a-c000.json',
 'part-00000-ed6bf062-0d8d-4ce2-b08a-c7234523907a-c000.json']

In [34]:
first_file = [fn for fn in os.listdir('data/mpg_json/') if not fn.startswith('.')]

In [35]:
len([fn for fn in os.listdir('data/mpg_json/') if not fn.startswith('.')][0])

57

In [23]:
first_file

'part-00001-ed6bf062-0d8d-4ce2-b08a-c7234523907a-c000.json'

In [32]:
spark.read.json(f'data/mpg_json/{first_file}').count()

25

In [28]:
spark.read.json('data/mpg_json').count()

234

#### write dataframe to csv

In [38]:
mpg.write.csv('data/mpg_json', mode='overwrite', header='True')

                                                                                

In [36]:
#df.write.format()
(
 mpg.write.format('csv')
    .mode('overwrite')
    .option('header', 'True')
    .save('data/mpg_csv')
)

                                                                                

### read files
- spark.read.[type]

#### read csv

In [37]:
#keep written csv headers
(
    spark.read.format("csv")
      .option("header", True)
      .load("data/mpg_csv")
).count()

234

### load source from 311_data in sql

In [39]:
#sql query
url = get_db_url('311_data')
query = 'select source_id, source_username from source'

In [40]:
#make pandas df
pandas_df = pd.read_sql(query, url)
pandas_df.head()

Unnamed: 0,source_id,source_username
0,100137,Merlene Blodgett
1,103582,Carmen Cura
2,106463,Richard Sanchez
3,119403,Betty De Hoyos
4,119555,Socorro Quiara


In [41]:
sources = spark.createDataFrame(pandas_df)

In [42]:
sources.printSchema()

root
 |-- source_id: string (nullable = true)
 |-- source_username: string (nullable = true)



### load cases from 311_data from sql

In [43]:
#sql query
db = '311_data'
query = 'select * from cases limit 100000'

In [44]:
pandas_df = pd.read_csv('311.csv')

In [46]:
# pandas_df.head()

In [None]:
#pandas df
# pandas_df = pd.read_sql(query, url)

In [47]:
#spark df
df = spark.createDataFrame(pandas_df)
df

DataFrame[case_id: bigint, case_opened_date: string, case_closed_date: string, SLA_due_date: string, case_late: string, num_days_late: double, case_closed: string, dept_division: string, service_request_type: string, SLA_days: double, case_status: string, source_id: string, request_address: string, council_district: bigint]

In [48]:
df.show(3, vertical=True, truncate=False)

-RECORD 0-----------------------------------------------------
 case_id              | 1014127332                            
 case_opened_date     | 1/1/18 0:42                           
 case_closed_date     | 1/1/18 12:29                          
 SLA_due_date         | 9/26/20 0:42                          
 case_late            | NO                                    
 num_days_late        | -998.5087616                          
 case_closed          | YES                                   
 dept_division        | Field Operations                      
 service_request_type | Stray Animal                          
 SLA_days             | 999.0                                 
 case_status          | Closed                                
 source_id            | svcCRMLS                              
 request_address      | 2315  EL PASO ST, San Antonio, 78207  
 council_district     | 5                                     
-RECORD 1----------------------------------------------

## Prepare

- rename columns
- correct datatypes
- data transformation
- make new features
- join tables

### rename columns

#### change SLA_due_date to case_due_date

In [50]:
# withcolumnrenamed:
# similar to withcolumn, but just takes the names of the column
# and the thing with which to replace
df = df.withColumnRenamed('SLA_due_date', 'case_due_date')

### correct datatypes

#### change case_closed and case_late columns into boolean values

In [51]:
df.select('case_closed', 'case_late').show(1)

+-----------+---------+
|case_closed|case_late|
+-----------+---------+
|        YES|       NO|
+-----------+---------+
only showing top 1 row



In [53]:
df.groupby('case_closed').count().show()

23/05/19 14:53:48 WARN TaskSetManager: Stage 31 contains a task of very large size (1005 KiB). The maximum recommended task size is 1000 KiB.
[Stage 31:>                                                       (0 + 12) / 12]

+-----------+-----+
|case_closed|count|
+-----------+-----+
|        YES|98547|
|         NO| 1453|
+-----------+-----+



                                                                                

In [52]:
df.select('case_closed').distinct().show()

23/05/19 14:53:16 WARN TaskSetManager: Stage 28 contains a task of very large size (1005 KiB). The maximum recommended task size is 1000 KiB.

+-----------+
|case_closed|
+-----------+
|        YES|
|         NO|
+-----------+



                                                                                

In [57]:
# [thing for thing in df.columns]

In [55]:
from pyspark.sql import functions as F

In [70]:
#use condition to make true and false
df = df.withColumn(
    'case_closed',
    df.case_closed == 'YES'
    # F.expr('case_closed == "YES"')
).withColumn(
    'case_late',
    F.expr('case_late == "YES"')
)

#### change council_district datatype to string

In [74]:
#use .cast()
df = df.withColumn('council_district',
             F.col('council_district').cast('string'))

#### change dates to datetype

format date strings: https://docs.oracle.com/javase/10/docs/api/java/time/format/DateTimeFormatter.html

In [76]:
df.select('case_opened_date',
          'case_closed_date').printSchema()

root
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)



In [78]:
df.select('case_opened_date',
          'case_closed_date').show(2)

+----------------+----------------+
|case_opened_date|case_closed_date|
+----------------+----------------+
|     1/1/18 0:42|    1/1/18 12:29|
|     1/1/18 0:46|     1/3/18 8:11|
+----------------+----------------+
only showing top 2 rows



In [81]:
fmt = 'M/d/yy H:m'

In [82]:
#use to_timestamp
df.select(
    F.to_timestamp('case_opened_date', fmt),
    F.to_timestamp('case_closed_date', fmt)
).show(2)

+------------------------------------------+------------------------------------------+
|to_timestamp(case_opened_date, M/d/yy H:m)|to_timestamp(case_closed_date, M/d/yy H:m)|
+------------------------------------------+------------------------------------------+
|                       2018-01-01 00:42:00|                       2018-01-01 12:29:00|
|                       2018-01-01 00:46:00|                       2018-01-03 08:11:00|
+------------------------------------------+------------------------------------------+
only showing top 2 rows



In [85]:
#use to_timestamp
df.withColumn('case_opened_date',
    F.to_timestamp('case_opened_date', fmt)
             ).withColumn(
    'case_closed_date',
    F.to_timestamp('case_closed_date', fmt)
).show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 9/26/20 0:42         
 case_late            | false                
 num_days_late        | -998.5087616         
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 2018-01-01 00:46:00  
 case_closed_date     | 2018-01-03 08:11:00  
 case_due_date        | 1/5/18 8:30          
 case_late            | false                
 num_days_late        | -2.0126041

In [86]:
#use to_timestamp
df = df.withColumn('case_opened_date',
    F.to_timestamp('case_opened_date', fmt)
             ).withColumn(
    'case_closed_date',
    F.to_timestamp('case_closed_date', fmt)
)

### data transformation

#### normalize address
- `lower`: lowercase everything
- `trim`: remove whitespace on the edges 

In [90]:
df = df.withColumn(
    'request_address',
    F.trim(F.lower('request_address'))
)

#### change num_days_late to num_weeks_late

In [93]:
df = df.withColumn(
    'num_weeks_late',
    F.expr('num_days_late / 7')
)

#### change council_district to int and pad with 00s

In [104]:
df = df.withColumn(
    'council_district',
    F.concat(F.lit('00'), F.col('council_district')).cast('int')
)

### new features

#### create zip code column

In [105]:
df.select('request_address').show(2, truncate=False)

+------------------------------------+
|request_address                     |
+------------------------------------+
|2315  el paso st, san antonio, 78207|
|2215  goliad rd, san antonio, 78223 |
+------------------------------------+
only showing top 2 rows



In [116]:
df = df.withColumn(
    'zip_code',
    F.regexp_extract('request_address',
                     '(\d+$)',1)
)


#### create case_lifetime column

- case_age: how long since the case first opened
- days_to_close: the number of days between days opened and days closed
- case_lifetime: if the case is open, how long since the case opened, if the case is closed, the number of days to close


In [118]:
#use datediff() to find the difference between two dates
df = df.withColumn(
    'case_age',
    F.datediff(
        F.current_timestamp(),
        'case_opened_date'
    )
)

In [119]:
# days to close:
df = df.withColumn(
    'days_to_close',
    F.datediff(
        'case_closed_date',
        'case_opened_date'
    )
)

In [120]:
df.show(1, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 9/26/20 0:42         
 case_late            | false                
 num_days_late        | -998.5087616         
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 5                    
 num_weeks_late       | -142.6441088         
 zip_code             | 78207                
 case_age             | 1964                 
 days_to_close        | 0                    
only showing top 1 row



In [127]:
df.filter(F.col('case_closed') == 'true').show(5, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 9/26/20 0:42         
 case_late            | false                
 num_days_late        | -998.5087616         
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 5                    
 num_weeks_late       | -142.6441088         
 zip_code             | 78207                
 case_age             | 1964                 
 days_to_close        | 0                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 2018-01-01

In [134]:
df.select(
    'case_age',
    'days_to_close',
    'case_opened_date',
    'case_closed_date',
'case_closed').show(1, vertical=True)

-RECORD 0-------------------------------
 case_age         | 1964                
 days_to_close    | 0                   
 case_opened_date | 2018-01-01 00:42:00 
 case_closed_date | 2018-01-01 12:29:00 
 case_closed      | true                
only showing top 1 row



In [146]:
df = df.withColumn(
    'case_lifetime',
    F.when(F.col('case_closed') == 'true', 
           F.col('days_to_close')
          ).otherwise(F.col('case_age'))
)

In [149]:
#drop unnecessary columns
df = df.drop('days_to_close').drop('case_age')

### join the dept table from sql to our current df

In [150]:
df.select('dept_division').show(5)

+----------------+
|   dept_division|
+----------------+
|Field Operations|
|     Storm Water|
|     Storm Water|
|Code Enforcement|
|Field Operations|
+----------------+
only showing top 5 rows



In [151]:
#get dept table from sql
query = 'select * from dept'

In [None]:
# url = get_db_url('311_data')
# dept = pd.read_sql(query, url)

In [152]:
dept = spark.createDataFrame(pd.read_csv('dept.csv'))
dept

DataFrame[dept_division: string, dept_name: string, standardized_dept_name: string, dept_subject_to_SLA: string]

In [155]:
dept.show(2, vertical=True)

-RECORD 0--------------------------------------
 dept_division          | 311 Call Center      
 dept_name              | Customer Service     
 standardized_dept_name | Customer Service     
 dept_subject_to_SLA    | YES                  
-RECORD 1--------------------------------------
 dept_division          | Brush                
 dept_name              | Solid Waste Manag... 
 standardized_dept_name | Solid Waste          
 dept_subject_to_SLA    | YES                  
only showing top 2 rows



In [156]:
df.show(1, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 9/26/20 0:42         
 case_late            | false                
 num_days_late        | -998.5087616         
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 5                    
 num_weeks_late       | -142.6441088         
 zip_code             | 78207                
 case_lifetime        | 0                    
only showing top 1 row



In [158]:
df = df.join(dept,
        'dept_division',
        'left')

### train, validate, test split

- `.randomSplit` to split df

In [159]:
train, validate, test = df.randomSplit(
    [.6,.2,.2],
seed=1349)

In [160]:
train.count()

23/05/19 15:55:27 WARN TaskSetManager: Stage 99 contains a task of very large size (1005 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

60123

In [161]:
df.count()

23/05/19 15:55:37 WARN TaskSetManager: Stage 108 contains a task of very large size (1005 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

100000