In [1]:
import os
os.environ['PYARROW_IGNORE_TIMEZONE'] = '1'

import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext


#### VARS #####
cr = '/Users/nadina/.gc/apd311.json'
project_id = 'apd311'
bucket_name = 'apd311'

In [2]:
from pyspark.sql import types
from pyspark.sql import functions as F

In [3]:
conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('apd311') \
    .set("spark.jars", "./gcs-connector-hadoop3-2.2.5.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", cr)


sc = SparkContext(conf=conf)


hadoop_conf = sc._jsc.hadoopConfiguration()
# the code below tells spark that when there is a location 
# "gs://..." it needs to connect to GCS
hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", cr)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

spark = SparkSession.builder \
        .config(conf=sc.getConf()) \
        .getOrCreate()

24/04/13 15:04:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
df = spark.read.parquet(f"gs://{bucket_name}/raw/*")

                                                                                

In [5]:
df.show(5)

[Stage 1:>                                                          (0 + 1) / 1]

+-----------+-----------+--------------------+--------------------+-------------------+-------------------+-------------------+---------------+-------------+-----------------+----------------+------------------+------------+-------------+
| request_id|status_desc|           type_desc|method_received_desc|       created_date|        status_date|       updated_date|location_county|location_city|location_zip_code|      location_x|        location_y|location_lat|location_long|
+-----------+-----------+--------------------+--------------------+-------------------+-------------------+-------------------+---------------+-------------+-----------------+----------------+------------------+------------+-------------+
|23-00524147|     Closed|      311 CC - Other|   Spot311 Interface|2023-11-25 10:33:52|2023-11-26 08:44:41|2023-11-26 08:44:41|         TRAVIS|       AUSTIN|            78702|3118611.49934361|1.00692810006935E7| 30.26374243| -97.72930334|
|23-00524146|     Closed|ATD - Concerns in..

                                                                                

In [8]:
df.printSchema()

root
 |-- request_id: string (nullable = true)
 |-- status_desc: string (nullable = true)
 |-- type_desc: string (nullable = true)
 |-- method_received_desc: string (nullable = true)
 |-- created_date: timestamp_ntz (nullable = true)
 |-- status_date: timestamp_ntz (nullable = true)
 |-- updated_date: timestamp_ntz (nullable = true)
 |-- location_county: string (nullable = true)
 |-- location_city: string (nullable = true)
 |-- location_zip_code: string (nullable = true)
 |-- location_x: double (nullable = true)
 |-- location_y: double (nullable = true)
 |-- location_lat: double (nullable = true)
 |-- location_long: double (nullable = true)



In [5]:
df.schema.names

['request_id',
 'status_desc',
 'type_desc',
 'method_received_desc',
 'created_date',
 'status_date',
 'updated_date',
 'location_county',
 'location_city',
 'location_zip_code',
 'location_x',
 'location_y',
 'location_lat',
 'location_long']

In [25]:
# check for nulls
df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()



+----------+-----------+---------+--------------------+------------+-----------+------------+---------------+-------------+-----------------+----------+----------+------------+-------------+
|request_id|status_desc|type_desc|method_received_desc|created_date|status_date|updated_date|location_county|location_city|location_zip_code|location_x|location_y|location_lat|location_long|
+----------+-----------+---------+--------------------+------------+-----------+------------+---------------+-------------+-----------------+----------+----------+------------+-------------+
|         0|          0|        0|                   0|           0|          0|           0|          24513|        17042|            18007|     13352|     13352|       13352|        13352|
+----------+-----------+---------+--------------------+------------+-----------+------------+---------------+-------------+-----------------+----------+----------+------------+-------------+



                                                                                

In [6]:
# the amount of null values is small, we can safely drop them
df = df.dropna()

In [28]:
# check for nulls
df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()



+----------+-----------+---------+--------------------+------------+-----------+------------+---------------+-------------+-----------------+----------+----------+------------+-------------+
|request_id|status_desc|type_desc|method_received_desc|created_date|status_date|updated_date|location_county|location_city|location_zip_code|location_x|location_y|location_lat|location_long|
+----------+-----------+---------+--------------------+------------+-----------+------------+---------------+-------------+-----------------+----------+----------+------------+-------------+
|         0|          0|        0|                   0|           0|          0|           0|              0|            0|                0|         0|         0|           0|            0|
+----------+-----------+---------+--------------------+------------+-----------+------------+---------------+-------------+-----------------+----------+----------+------------+-------------+



                                                                                

In [29]:
f'shape {df.count()}, {len(df.columns)}'

                                                                                

'shape 1801462, 14'

In [11]:
df.withColumn(
    'month_created', F.date_format(df.created_date, 'MMMM')
    ).show()

[Stage 1:>                                                          (0 + 1) / 1]

+-----------+------------------+--------------------+--------------------+-------------------+-------------------+-------------------+---------------+-------------+-----------------+----------------+------------------+------------+-------------+-------------+
| request_id|       status_desc|           type_desc|method_received_desc|       created_date|        status_date|       updated_date|location_county|location_city|location_zip_code|      location_x|        location_y|location_lat|location_long|month_created|
+-----------+------------------+--------------------+--------------------+-------------------+-------------------+-------------------+---------------+-------------+-----------------+----------------+------------------+------------+-------------+-------------+
|23-00524147|            Closed|      311 CC - Other|   Spot311 Interface|2023-11-25 10:33:52|2023-11-26 08:44:41|2023-11-26 08:44:41|         TRAVIS|       AUSTIN|            78702|3118611.49934361|1.00692810006935E7| 3

                                                                                

In [7]:
df.withColumn(
    'month', F.date_format(df.created_date, 'MMMM')
    ).withColumn(
    'monthx', F.month('created_date')
    ).withColumn(
    'year', F.year('created_date')
    ).withColumn('month_created', F.concat('year', 'monthx')).drop('monthx').show(5)

[Stage 2:>                                                          (0 + 1) / 1]

+-----------+-----------+--------------------+--------------------+-------------------+-------------------+-------------------+---------------+-------------+-----------------+----------------+------------------+------------+-------------+--------+----+-------------+
| request_id|status_desc|           type_desc|method_received_desc|       created_date|        status_date|       updated_date|location_county|location_city|location_zip_code|      location_x|        location_y|location_lat|location_long|   month|year|month_created|
+-----------+-----------+--------------------+--------------------+-------------------+-------------------+-------------------+---------------+-------------+-----------------+----------------+------------------+------------+-------------+--------+----+-------------+
|23-00524147|     Closed|      311 CC - Other|   Spot311 Interface|2023-11-25 10:33:52|2023-11-26 08:44:41|2023-11-26 08:44:41|         TRAVIS|       AUSTIN|            78702|3118611.49934361|1.00692

                                                                                

In [8]:
df = df.withColumn(
    'month', F.date_format(df.created_date, 'MMMM')
    ).withColumn(
    'monthx', F.month('created_date')
    ).withColumn(
    'year', F.year('created_date')
    ).withColumn('month_created', F.concat('year', 'monthx'))

In [38]:
# create columns month and year based on created date
# df = df.withColumn(
#     'month_created', F.month('created_date')
#     ).withColumn(
#     'year_created', F.year('created_date')
#     )

In [39]:
df.show(5)

[Stage 19:>                                                         (0 + 1) / 1]

+-----------+-----------+--------------------+--------------------+-------------------+-------------------+-------------------+---------------+-------------+-----------------+----------------+------------------+------------+-------------+-------------+------------+
| request_id|status_desc|           type_desc|method_received_desc|       created_date|        status_date|       updated_date|location_county|location_city|location_zip_code|      location_x|        location_y|location_lat|location_long|month_created|year_created|
+-----------+-----------+--------------------+--------------------+-------------------+-------------------+-------------------+---------------+-------------+-----------------+----------------+------------------+------------+-------------+-------------+------------+
|23-00524147|     Closed|      311 CC - Other|   Spot311 Interface|2023-11-25 10:33:52|2023-11-26 08:44:41|2023-11-26 08:44:41|         TRAVIS|       AUSTIN|            78702|3118611.49934361|1.00692810

                                                                                

In [40]:
df.select('location_city').distinct().collect()

                                                                                

[Row(location_city='WEST LAKE HILL'),
 Row(location_city='DRIPPING SPRIN'),
 Row(location_city='HUTTO'),
 Row(location_city='THE HILLS'),
 Row(location_city='Manor'),
 Row(location_city='BASTROP'),
 Row(location_city='MANCHACA'),
 Row(location_city='ROUND ROCK'),
 Row(location_city='LAKEWAY'),
 Row(location_city='Pflugerville'),
 Row(location_city='SAN LEANNA'),
 Row(location_city='AUSTIN'),
 Row(location_city='Austin'),
 Row(location_city='MUSTANG RIDGE'),
 Row(location_city='ROLLINGWOOD'),
 Row(location_city='SUNSET VALLEY'),
 Row(location_city='MANOR'),
 Row(location_city='BUDA'),
 Row(location_city='austin'),
 Row(location_city='BEE CAVE'),
 Row(location_city='WEBBERVILLE'),
 Row(location_city='CREEDMOOR'),
 Row(location_city='TRAVIS COUNTY'),
 Row(location_city='LEANDER'),
 Row(location_city='VOLENTE'),
 Row(location_city='LAGO VISTA'),
 Row(location_city='manor'),
 Row(location_city='AUstin'),
 Row(location_city='JONESTOWN'),
 Row(location_city='CEDAR PARK'),
 Row(location_city='

In [43]:
# all values to title case
df.select(F.initcap('location_city')).distinct().collect()

                                                                                

[Row(initcap(location_city)='West Lake Hill'),
 Row(initcap(location_city)='Bee Cave'),
 Row(initcap(location_city)='Buda'),
 Row(initcap(location_city)='Lago Vista'),
 Row(initcap(location_city)='Manor'),
 Row(initcap(location_city)='Creedmoor'),
 Row(initcap(location_city)='The Hills'),
 Row(initcap(location_city)='Travis County'),
 Row(initcap(location_city)='Lakeway'),
 Row(initcap(location_city)='Pflugerville'),
 Row(initcap(location_city)='Austin'),
 Row(initcap(location_city)='Rollingwood'),
 Row(initcap(location_city)='Webberville'),
 Row(initcap(location_city)='Round Rock'),
 Row(initcap(location_city)='Sunset Valley'),
 Row(initcap(location_city)='Jonestown'),
 Row(initcap(location_city)='Bastrop'),
 Row(initcap(location_city)='Volente'),
 Row(initcap(location_city)='Hays'),
 Row(initcap(location_city)='Mustang Ridge'),
 Row(initcap(location_city)='Hutto'),
 Row(initcap(location_city)='Elgin'),
 Row(initcap(location_city)='Briarcliff'),
 Row(initcap(location_city)='Dripping S

In [9]:
 replace_dict = {
        'Travis County':'Austin',
        'Dripping Sprin':'Dripping Springs',
        'Austin 3-1-1':'Austin',
        'West Lake Hill':'West Lake Hills',
        'A':'Austin',
        'Austin 5 Etj':'Austin',
        'Village Of The Hills':'The Hills',
        'Village Of San Leanna':'San Leanna',
        'Village Of Webberville':'Webberville',
        'Village Of Creedmoor':'Creedmoor',
        'Village Of Mustang Ridge':'Mustang Ridge',
        'Village Of Point Venture':'Point Venture',
        'Village Of Briarcliff':'Briarcliff',
        'Village Of Volente':'Volente',
        'Aust':'Austin',
        'Ausitn':'Austin',
        'Austibn':'Austin',
        'Austn':'Austin',
        'Austi':'Austin',
        'Aus':'Austin',
        'Atx':'Austin',
        'Au':'Austin',
        'Austtin':'Austin',
        'Austin `':'Austin',
        'Austin.':'Austin',
        'Austun':'Austin',
        'Austin, Tx':'Austin'
}

In [50]:
df.withColumn(
    'location_city', F.initcap('location_city')
).withColumn(
    'location_city', F.col('location_city')
).replace(replace_dict).select('location_city').distinct().collect()

                                                                                

[Row(location_city='Bee Cave'),
 Row(location_city='Buda'),
 Row(location_city='Lago Vista'),
 Row(location_city='Manor'),
 Row(location_city='Creedmoor'),
 Row(location_city='West Lake Hills'),
 Row(location_city='The Hills'),
 Row(location_city='Lakeway'),
 Row(location_city='Pflugerville'),
 Row(location_city='Austin'),
 Row(location_city='Rollingwood'),
 Row(location_city='Webberville'),
 Row(location_city='Round Rock'),
 Row(location_city='Sunset Valley'),
 Row(location_city='Dripping Springs'),
 Row(location_city='Jonestown'),
 Row(location_city='Bastrop'),
 Row(location_city='Volente'),
 Row(location_city='Hays'),
 Row(location_city='Mustang Ridge'),
 Row(location_city='Hutto'),
 Row(location_city='Elgin'),
 Row(location_city='Briarcliff'),
 Row(location_city='Manchaca'),
 Row(location_city='San Leanna'),
 Row(location_city='Cedar Park'),
 Row(location_city='Leander'),
 Row(location_city='Victoria'),
 Row(location_city='Cedar Creek'),
 Row(location_city='San Marcos'),
 Row(locat

In [10]:
df = df.withColumn(
    'location_city', F.initcap('location_city')
).withColumn(
    'location_city', F.col('location_city')
).replace(replace_dict)

In [52]:
df.show(5)

[Stage 32:>                                                         (0 + 1) / 1]

+-----------+-----------+--------------------+--------------------+-------------------+-------------------+-------------------+---------------+-------------+-----------------+----------------+------------------+------------+-------------+-------------+------------+
| request_id|status_desc|           type_desc|method_received_desc|       created_date|        status_date|       updated_date|location_county|location_city|location_zip_code|      location_x|        location_y|location_lat|location_long|month_created|year_created|
+-----------+-----------+--------------------+--------------------+-------------------+-------------------+-------------------+---------------+-------------+-----------------+----------------+------------------+------------+-------------+-------------+------------+
|23-00524147|     Closed|      311 CC - Other|   Spot311 Interface|2023-11-25 10:33:52|2023-11-26 08:44:41|2023-11-26 08:44:41|         TRAVIS|       Austin|            78702|3118611.49934361|1.00692810

                                                                                

In [53]:
df.count(), len(df.columns)

                                                                                

(1801462, 16)

In [54]:
df.select('location_county').distinct().collect()

                                                                                

[Row(location_county='BASTROP'),
 Row(location_county='WILLIAMSON'),
 Row(location_county='TRAVIS'),
 Row(location_county='HAYS'),
 Row(location_county='CALDWELL'),
 Row(location_county='BURNET')]

In [55]:
df.schema.names

['request_id',
 'status_desc',
 'type_desc',
 'method_received_desc',
 'created_date',
 'status_date',
 'updated_date',
 'location_county',
 'location_city',
 'location_zip_code',
 'location_x',
 'location_y',
 'location_lat',
 'location_long',
 'month_created',
 'year_created']

In [56]:
df.select('status_desc').distinct().collect()

                                                                                

[Row(status_desc='Work In Progress'),
 Row(status_desc='Open'),
 Row(status_desc='Closed -Incomplete'),
 Row(status_desc='Duplicate (closed)'),
 Row(status_desc='Duplicate (open)'),
 Row(status_desc='Closed'),
 Row(status_desc='TO BE DELETED'),
 Row(status_desc='Resolved'),
 Row(status_desc='Closed -Incomplete Information'),
 Row(status_desc='Transferred'),
 Row(status_desc='New'),
 Row(status_desc='CancelledTesting'),
 Row(status_desc='Pending')]

In [11]:
replace_status = {
        'Work In Progress':'Open',
        'Duplicate (closed)':'Duplicate',
        'Duplicate (open)':'Duplicate',
        'TO BE DELETED':'Closed',
        'Resolved':'Closed',
        'Closed -Incomplete Information':'Closed -Incomplete',
        'New':'Open',
        'CancelledTesting':'Closed'
}

In [12]:
df = df.replace(replace_status, subset='status_desc')

In [66]:
df.select('method_received_desc').distinct().count()

                                                                                

20

In [67]:
df.select('method_received_desc').distinct().collect()

                                                                                

[Row(method_received_desc='Mobile Created'),
 Row(method_received_desc='E-Mail'),
 Row(method_received_desc='Open311'),
 Row(method_received_desc='Phone'),
 Row(method_received_desc='Other'),
 Row(method_received_desc='Social Media'),
 Row(method_received_desc='Radio'),
 Row(method_received_desc='Spot311 Interface'),
 Row(method_received_desc='CSR - Follow On SR'),
 Row(method_received_desc='Web'),
 Row(method_received_desc='Mobile Device'),
 Row(method_received_desc='Voice Mail'),
 Row(method_received_desc='Interface'),
 Row(method_received_desc='Field Request'),
 Row(method_received_desc='Mass Entry'),
 Row(method_received_desc='External Interface'),
 Row(method_received_desc='Walk In'),
 Row(method_received_desc='PremierOne CSR Mob'),
 Row(method_received_desc='Fax'),
 Row(method_received_desc='Mail')]

In [None]:
[Row(method_received_desc='Mobile Created'), # app
 Row(method_received_desc='E-Mail'),
 Row(method_received_desc='Open311'), # phone
 Row(method_received_desc='Phone'),
 Row(method_received_desc='Other'),
 Row(method_received_desc='Social Media'), 
 Row(method_received_desc='Radio'),
 Row(method_received_desc='Spot311 Interface'), # app
 Row(method_received_desc='CSR - Follow On SR'), # app
 Row(method_received_desc='Web'),
 Row(method_received_desc='Mobile Device'), # app
 Row(method_received_desc='Voice Mail'),
 Row(method_received_desc='Interface'), # web
 Row(method_received_desc='Field Request'),
 Row(method_received_desc='Mass Entry'),
 Row(method_received_desc='External Interface'), # web
 Row(method_received_desc='Walk In'),
 Row(method_received_desc='PremierOne CSR Mob'), # app
 Row(method_received_desc='Fax'),
 Row(method_received_desc='Mail')]

In [13]:
df = df.withColumn(
    'method_received', 
    F.when((F.col('method_received_desc') == 'Mobile Created'), 'app')\
    .when((F.col('method_received_desc') == 'Spot311 Interface'), 'app')\
    .when((F.col('method_received_desc') == 'CSR - Follow On SR'), 'app')\
    .when((F.col('method_received_desc') == 'PremierOne CSR Mob'), 'app')\
    .when((F.col('method_received_desc') == 'Mobile Device'), 'app')\
    .when((F.col('method_received_desc') =='Open311'), 'phone')\
    .when((F.col('method_received_desc') =='Phone'), 'phone')\
    .when((F.col('method_received_desc') =='E-Mail'), 'e-mail')\
    .when((F.col('method_received_desc') =='Web'), 'web')\
    .when((F.col('method_received_desc') =='External Interface'), 'web')\
    .otherwise('other')
)

In [14]:
df = df.withColumn(
    'method_received', 
    F.when((F.col('method_received_desc') == 'Mobile Created'), 'app')\
    .when((F.col('method_received_desc') == 'Spot311 Interface'), 'app')\
    .when((F.col('method_received_desc') == 'CSR - Follow On SR'), 'app')\
    .when((F.col('method_received_desc') == 'PremierOne CSR Mob'), 'app')\
    .when((F.col('method_received_desc') == 'Mobile Device'), 'app')\
    .when((F.col('method_received_desc') =='Open311'), 'phone')\
    .when((F.col('method_received_desc') =='Phone'), 'phone')\
    .when((F.col('method_received_desc') =='E-Mail'), 'e-mail')\
    .when((F.col('method_received_desc') =='Web'), 'web')\
    .when((F.col('method_received_desc') =='External Interface'), 'web')\
    .otherwise('other')
)

In [69]:
df.schema.names

['request_id',
 'status_desc',
 'type_desc',
 'method_received_desc',
 'created_date',
 'status_date',
 'updated_date',
 'location_county',
 'location_city',
 'location_zip_code',
 'location_x',
 'location_y',
 'location_lat',
 'location_long',
 'month_created',
 'year_created']

In [72]:
# too many types, 317
df.select('type_desc').distinct().count()

                                                                                

317

In [77]:
df

DataFrame[request_id: string, status_desc: string, type_desc: string, method_received_desc: string, created_date: timestamp_ntz, status_date: timestamp_ntz, updated_date: timestamp_ntz, location_county: string, location_city: string, location_zip_code: string, location_x: double, location_y: double, location_lat: double, location_long: double, month_created: int, year_created: int]

In [82]:
# new column how long case was open: status date - created date
df.withColumn(
    'case_duration', 
    F.datediff(
        F.to_date(F.col('status_date')),
        F.to_date(F.col('created_date'))
        )
).show()


[Stage 63:>                                                         (0 + 1) / 1]

+-----------+-----------+--------------------+--------------------+-------------------+-------------------+-------------------+---------------+-------------+-----------------+----------------+------------------+------------+-------------+-------------+------------+-------------+
| request_id|status_desc|           type_desc|method_received_desc|       created_date|        status_date|       updated_date|location_county|location_city|location_zip_code|      location_x|        location_y|location_lat|location_long|month_created|year_created|case_duration|
+-----------+-----------+--------------------+--------------------+-------------------+-------------------+-------------------+---------------+-------------+-----------------+----------------+------------------+------------+-------------+-------------+------------+-------------+
|23-00524147|     Closed|      311 CC - Other|   Spot311 Interface|2023-11-25 10:33:52|2023-11-26 08:44:41|2023-11-26 08:44:41|         TRAVIS|       Austin|   

                                                                                

In [15]:
df = df.withColumn(
    'case_duration_days', 
    F.datediff(
        F.to_date(F.col('status_date')),
        F.to_date(F.col('created_date'))
        )
)

In [85]:
# if the status is not closed yet, replace with null
df.withColumn(
    'case_duration_days', 
    F.when(F.col('status_desc') == 'Closed', F.col('case_duration_days'))\
    .otherwise(None)
).show()

[Stage 64:>                                                         (0 + 1) / 1]

+-----------+-----------+--------------------+--------------------+-------------------+-------------------+-------------------+---------------+-------------+-----------------+----------------+------------------+------------+-------------+-------------+------------+------------------+
| request_id|status_desc|           type_desc|method_received_desc|       created_date|        status_date|       updated_date|location_county|location_city|location_zip_code|      location_x|        location_y|location_lat|location_long|month_created|year_created|case_duration_days|
+-----------+-----------+--------------------+--------------------+-------------------+-------------------+-------------------+---------------+-------------+-----------------+----------------+------------------+------------+-------------+-------------+------------+------------------+
|23-00524147|     Closed|      311 CC - Other|   Spot311 Interface|2023-11-25 10:33:52|2023-11-26 08:44:41|2023-11-26 08:44:41|         TRAVIS|  

                                                                                

In [16]:
df = df.withColumn(
    'case_duration_days', 
    F.when(F.col('status_desc') == 'Closed', F.col('case_duration_days'))\
    .otherwise(None)
)

In [17]:
df.printSchema()

root
 |-- request_id: string (nullable = true)
 |-- status_desc: string (nullable = true)
 |-- type_desc: string (nullable = true)
 |-- method_received_desc: string (nullable = true)
 |-- created_date: timestamp_ntz (nullable = true)
 |-- status_date: timestamp_ntz (nullable = true)
 |-- updated_date: timestamp_ntz (nullable = true)
 |-- location_county: string (nullable = true)
 |-- location_city: string (nullable = true)
 |-- location_zip_code: string (nullable = true)
 |-- location_x: double (nullable = true)
 |-- location_y: double (nullable = true)
 |-- location_lat: double (nullable = true)
 |-- location_long: double (nullable = true)
 |-- month: string (nullable = true)
 |-- monthx: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month_created: string (nullable = true)
 |-- method_received: string (nullable = false)
 |-- case_duration_days: integer (nullable = true)



In [18]:
import pandas as pd

In [89]:
# make available for sql queries
df.createOrReplaceTempView('df')

In [95]:
df_pandas = spark.sql("""
        SELECT *
        FROM df
        LIMIT 100
    """).toPandas()
df_pandas.head(2)

  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)


Unnamed: 0,request_id,status_desc,type_desc,method_received_desc,created_date,status_date,updated_date,location_county,location_city,location_zip_code,location_x,location_y,location_lat,location_long,month_created,year_created,case_duration_days
0,23-00524147,Closed,311 CC - Other,Spot311 Interface,2023-11-25 10:33:52,2023-11-26 08:44:41,2023-11-26 08:44:41,TRAVIS,Austin,78702,3118611.0,10069280.0,30.263742,-97.729303,11,2023,1.0
1,23-00524146,Closed,ATD - Concerns in Right of Way,Spot311 Interface,2023-11-25 10:33:50,2023-12-05 08:58:50,2023-12-05 08:58:50,TRAVIS,Austin,78721,3132347.0,10075820.0,30.280823,-97.685314,11,2023,10.0


In [19]:
# easier way :)
df_pandas = df.limit(100).toPandas()

  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)


In [103]:
df_pandas.columns

Index(['request_id', 'status_desc', 'type_desc', 'method_received_desc',
       'created_date', 'status_date', 'updated_date', 'location_county',
       'location_city', 'location_zip_code', 'location_x', 'location_y',
       'location_lat', 'location_long', 'month_created', 'year_created',
       'case_duration_days', 'method_received'],
      dtype='object')

In [23]:
# reorder columns

# drop 'updated_date' - > not needed for analysis
# drop 'location_x', 'location_y' -> state plane coordinates

new_order = ['request_id', 'status_desc', 'type_desc', 'method_received', 'method_received_desc',
       'created_date', 'month_created', 'year_created', 'status_date', 'case_duration_days',
        'location_county', 'location_city', 'location_zip_code',
       'location_lat', 'location_long', 
       ]

In [None]:
df = df.select(new_order)

In [22]:
df.dtypes

[('request_id', 'string'),
 ('status_desc', 'string'),
 ('type_desc', 'string'),
 ('method_received_desc', 'string'),
 ('created_date', 'timestamp_ntz'),
 ('status_date', 'timestamp_ntz'),
 ('updated_date', 'timestamp_ntz'),
 ('location_county', 'string'),
 ('location_city', 'string'),
 ('location_zip_code', 'string'),
 ('location_x', 'double'),
 ('location_y', 'double'),
 ('location_lat', 'double'),
 ('location_long', 'double'),
 ('month', 'string'),
 ('monthx', 'int'),
 ('year', 'int'),
 ('month_created', 'string'),
 ('method_received', 'string'),
 ('case_duration_days', 'int')]

In [108]:
df.printSchema()

root
 |-- request_id: string (nullable = true)
 |-- status_desc: string (nullable = true)
 |-- type_desc: string (nullable = true)
 |-- method_received: string (nullable = false)
 |-- method_received_desc: string (nullable = true)
 |-- created_date: timestamp_ntz (nullable = true)
 |-- month_created: integer (nullable = true)
 |-- year_created: integer (nullable = true)
 |-- status_date: timestamp_ntz (nullable = true)
 |-- case_duration_days: integer (nullable = true)
 |-- location_county: string (nullable = true)
 |-- location_city: string (nullable = true)
 |-- location_zip_code: string (nullable = true)
 |-- location_lat: double (nullable = true)
 |-- location_long: double (nullable = true)



In [3]:
# df.write.partitionBy('year', 'month').mode('overwrite').format('bigquery') \
#     .option('table', 'apd311.apd311') \
#     .save()

In [26]:
df = spark.read.parquet(f"gs://{bucket_name}/raw/*")
print('DF loaded')

# the amount of null values is small, we can safely drop them
df = df.dropna()
print('Dropped nulls')

# create columns month(December), year (YYYY), month created(YYYYMM) based on created date
df = df.withColumn(
    'month', F.date_format(df.created_date, 'MMMM')
    ).withColumn(
    'monthx', F.month('created_date')
    ).withColumn(
    'year', F.year('created_date')
    ).withColumn('month_created', F.concat('year', 'monthx')).drop('monthx')

print('Created date columns')
# standardize location_city
replace_dict = {
        'Travis County':'Austin',
        'Dripping Sprin':'Dripping Springs',
        'Austin 3-1-1':'Austin',
        'West Lake Hill':'West Lake Hills',
        'A':'Austin',
        'Austin 5 Etj':'Austin',
        'Village Of The Hills':'The Hills',
        'Village Of San Leanna':'San Leanna',
        'Village Of Webberville':'Webberville',
        'Village Of Creedmoor':'Creedmoor',
        'Village Of Mustang Ridge':'Mustang Ridge',
        'Village Of Point Venture':'Point Venture',
        'Village Of Briarcliff':'Briarcliff',
        'Village Of Volente':'Volente',
        'Aust':'Austin',
        'Ausitn':'Austin',
        'Austibn':'Austin',
        'Austn':'Austin',
        'Austi':'Austin',
        'Aus':'Austin',
        'Atx':'Austin',
        'Au':'Austin',
        'Austtin':'Austin',
        'Austin `':'Austin',
        'Austin.':'Austin',
        'Austun':'Austin',
        'Austin, Tx':'Austin'
}

df = df.withColumn(
    'location_city', F.initcap('location_city')
).withColumn(
    'location_city', F.col('location_city')
).replace(replace_dict)

# standardize status description
replace_status = {
        'Work In Progress':'Open',
        'Duplicate (closed)':'Duplicate',
        'Duplicate (open)':'Duplicate',
        'TO BE DELETED':'Closed',
        'Resolved':'Closed',
        'Closed -Incomplete Information':'Closed -Incomplete',
        'New':'Open',
        'CancelledTesting':'Closed'
}
df = df.replace(replace_status, subset='status_desc')

# create a new column method received, transform detailed description into generalized one
df = df.withColumn(
    'method_received', 
    F.when((F.col('method_received_desc') == 'Mobile Created'), 'app')\
    .when((F.col('method_received_desc') == 'Spot311 Interface'), 'app')\
    .when((F.col('method_received_desc') == 'CSR - Follow On SR'), 'app')\
    .when((F.col('method_received_desc') == 'PremierOne CSR Mob'), 'app')\
    .when((F.col('method_received_desc') == 'Mobile Device'), 'app')\
    .when((F.col('method_received_desc') =='Open311'), 'phone')\
    .when((F.col('method_received_desc') =='Phone'), 'phone')\
    .when((F.col('method_received_desc') =='E-Mail'), 'e-mail')\
    .when((F.col('method_received_desc') =='Web'), 'web')\
    .when((F.col('method_received_desc') =='External Interface'), 'web')\
    .otherwise('other')
)
print('Standartized values')
# new column how many days case was open: status date - created date
df = df.withColumn(
    'case_duration_days', 
    F.datediff(
        F.to_date(F.col('status_date')),
        F.to_date(F.col('created_date'))
        )
)
print('Created case duration days')
# if the status is not closed yet, replace with null
df = df.withColumn(
    'case_duration_days', 
    F.when(F.col('status_desc') == 'Closed', F.col('case_duration_days'))\
    .otherwise(None)
)

# reorder columns

# drop 'updated_date' - > not needed for analysis
# drop 'location_x', 'location_y' -> state plane coordinates

new_order = ['request_id', 'status_desc', 'type_desc', 'method_received', 'method_received_desc',
       'created_date', 'month_created', 'month', 'year', 'status_date', 'case_duration_days',
        'location_county', 'location_city', 'location_zip_code',
       'location_lat', 'location_long', 
       ]

df = df.select(new_order)
print('Reordered')

                                                                                

DF loaded
Dropped nulls
Created date columns
Standartized values
Created case duration days
Reordered


In [27]:
import pandas as pd 
df_pandas = df.limit(100).toPandas()

  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)


In [28]:
df.dtypes

[('request_id', 'string'),
 ('status_desc', 'string'),
 ('type_desc', 'string'),
 ('method_received', 'string'),
 ('method_received_desc', 'string'),
 ('created_date', 'timestamp_ntz'),
 ('month_created', 'string'),
 ('month', 'string'),
 ('year', 'int'),
 ('status_date', 'timestamp_ntz'),
 ('case_duration_days', 'int'),
 ('location_county', 'string'),
 ('location_city', 'string'),
 ('location_zip_code', 'string'),
 ('location_lat', 'double'),
 ('location_long', 'double')]

24/04/14 09:10:23 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 5315751 ms exceeds timeout 120000 ms
24/04/14 09:10:23 WARN SparkContext: Killing executors is not supported by current scheduler.
24/04/14 09:10:26 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:322)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:641)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1111)
	at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:244)
	at s