In [1]:
import os
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType
import pandas as pd

In [2]:
def get_connection_url(db, username=os.getenv('sqlUSER'), host=os.getenv('sqlHOST'), password=os.getenv('sqlPSWD')):
    """
    This function will:
    - take username, pswd, host credentials from imported env module
    - output a formatted connection_url
    """
    return f'mysql+pymysql://{username}:{password}@{host}/{db}'

In [3]:
# Generate spark object to initialize a local Spark JVM process
spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/23 10:27:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
appName = "Spark - Setting Log Level"
master = "local"

# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

23/08/23 10:27:33 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [5]:
df = spark.read.csv("source.csv", sep=",", header=True, inferSchema=True)

df

DataFrame[source_id: string, source_username: string]

or

In [6]:
(
    spark.read.format("csv")
    .option("sep", ",")
    .option("inferSchema", True)
    .option("header", True)
    .load("source.csv")
)

DataFrame[source_id: string, source_username: string]

In [7]:
# as opposed to the actual schema object associated with the DataFrame
df.schema

StructType([StructField('source_id', StringType(), True), StructField('source_username', StringType(), True)])

In [8]:
# This schema can be specified/assigned initially to (possibly) speed up loading of data
schema = StructType(
          [
              StructField("source_id", StringType()),
              StructField("source_name", StringType())
          ]  
)

df1 = spark.read.csv("source.csv", sep=",", header=True, schema=schema)

In [9]:
df1.show()

+---------+--------------------+
|source_id|         source_name|
+---------+--------------------+
|   100137|    Merlene Blodgett|
|   103582|         Carmen Cura|
|   106463|     Richard Sanchez|
|   119403|      Betty De Hoyos|
|   119555|      Socorro Quiara|
|   119868| Michelle San Miguel|
|   120752|      Eva T. Kleiber|
|   124405|           Lori Lara|
|   132408|       Leonard Silva|
|   135723|        Amy Cardenas|
|   136202|    Michelle Urrutia|
|   136979|      Leticia Garcia|
|   137943|    Pamela K. Baccus|
|   138605|        Marisa Ozuna|
|   138650|      Kimberly Green|
|   138650|Kimberly Green-Woods|
|   138793| Guadalupe Rodriguez|
|   138810|       Tawona Martin|
|   139342|     Jessica Mendoza|
|   139344|        Isis Mendoza|
+---------+--------------------+
only showing top 20 rows



In [10]:
# for demo purposes
from pydataset import data

In [11]:
mpg = spark.createDataFrame(data("mpg"))

mpg.write.json("mpg_json", mode="overwrite")

                                                                                

In [12]:
# Or...
(
    mpg.write.format("csv")
    .mode("overwrite")
    .option("header", True)
    .save("mpg_csv")
)

In [13]:
spark.read.json("mpg_json").count()

234

In [14]:
spark.read.csv("mpg_csv").count()

242

In [15]:
df = spark.read.csv("case.csv", header=True, inferSchema=True)
df.show(2, vertical=True, truncate=False) #Default: True = truncate strings longer than 20 chars


                                                                                

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 1/1/18 0:42                          
 case_closed_date     | 1/1/18 12:29                         
 SLA_due_date         | 9/26/20 0:42                         
 case_late            | NO                                   
 num_days_late        | -998.5087616000001                   
 case_closed          | YES                                  
 dept_division        | Field Operations                     
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  EL PASO ST, San Antonio, 78207 
 council_district     | 5                                    
-RECORD 1----------------------------------------------------
 case_id

In [16]:
# eww...
df.show(2)

+----------+----------------+----------------+------------+---------+-------------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|   case_id|case_opened_date|case_closed_date|SLA_due_date|case_late|      num_days_late|case_closed|   dept_division|service_request_type|   SLA_days|case_status|source_id|     request_address|council_district|
+----------+----------------+----------------+------------+---------+-------------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|1014127332|     1/1/18 0:42|    1/1/18 12:29|9/26/20 0:42|       NO| -998.5087616000001|        YES|Field Operations|        Stray Animal|      999.0|     Closed| svcCRMLS|2315  EL PASO ST,...|               5|
|1014127333|     1/1/18 0:46|     1/3/18 8:11| 1/5/18 8:30|       NO|-2.0126041669999997|        YES|     Storm Water|Removal Of Obstru...|4.322222222| 

**Rename Columns**

In [17]:
df = df.withColumnRenamed("SLA_due_date", "case_due_date")
df.show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 case_due_date        | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 1/1/18 0:46          
 case_closed_date     | 1/3/18 8:11          
 case_due_date        | 1/5/18 8:30          
 case_late            | NO                   
 num_days_late        | -2.0126041

**Correct data types**

In [18]:
# demonstrating we only have yes/no in each field
df.groupby("case_closed", "case_late").count().show()

[Stage 20:>                                                         (0 + 8) / 8]

+-----------+---------+------+
|case_closed|case_late| count|
+-----------+---------+------+
|         NO|      YES|  6525|
|        YES|      YES| 87978|
|         NO|       NO| 11585|
|        YES|       NO|735616|
+-----------+---------+------+





In [19]:
#Let's create two new boolean columns
df = df.withColumn("case_closed", expr('case_closed == "YES"')).withColumn(
        "case_late", expr('case_late == "YES"'))

#Select just the two columns
df.select("case_closed", "case_late").show(5)

+-----------+---------+
|case_closed|case_late|
+-----------+---------+
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|     true|
+-----------+---------+
only showing top 5 rows



In [20]:
df.show(1, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 case_due_date        | 9/26/20 0:42         
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
only showing top 1 row



Turn int into str

In [21]:
df.groupBy("council_district").count().show()

+----------------+------+
|council_district| count|
+----------------+------+
|               1|119309|
|               6| 74095|
|               3|102706|
|               5|114609|
|               9| 40916|
|               4| 93778|
|               8| 42345|
|               7| 72445|
|              10| 62926|
|               2|114745|
|               0|  3830|
+----------------+------+





In [22]:
df = df.withColumn("council_district", col("council_district").cast("string"))

Change to datetime object

In [23]:
#Before
df.select('case_opened_date', 'case_closed_date', 'case_due_date').show(5)

+----------------+----------------+-------------+
|case_opened_date|case_closed_date|case_due_date|
+----------------+----------------+-------------+
|     1/1/18 0:42|    1/1/18 12:29| 9/26/20 0:42|
|     1/1/18 0:46|     1/3/18 8:11|  1/5/18 8:30|
|     1/1/18 0:48|     1/2/18 7:57|  1/5/18 8:30|
|     1/1/18 1:29|     1/2/18 8:13| 1/17/18 8:30|
|     1/1/18 1:34|    1/1/18 13:29|  1/1/18 4:34|
+----------------+----------------+-------------+
only showing top 5 rows



In [24]:
#After

fmt = "M/d/yy H:mm"

df = (
    df.withColumn("case_opened_date", to_timestamp("case_opened_date", fmt))
    .withColumn("case_closed_date", to_timestamp("case_closed_date", fmt))
    .withColumn("case_due_date", to_timestamp("case_due_date", fmt))
)

df.select("case_opened_date", "case_closed_date", "case_due_date").show(5)

+-------------------+-------------------+-------------------+
|   case_opened_date|   case_closed_date|      case_due_date|
+-------------------+-------------------+-------------------+
|2018-01-01 00:42:00|2018-01-01 12:29:00|2020-09-26 00:42:00|
|2018-01-01 00:46:00|2018-01-03 08:11:00|2018-01-05 08:30:00|
|2018-01-01 00:48:00|2018-01-02 07:57:00|2018-01-05 08:30:00|
|2018-01-01 01:29:00|2018-01-02 08:13:00|2018-01-17 08:30:00|
|2018-01-01 01:34:00|2018-01-01 13:29:00|2018-01-01 04:34:00|
+-------------------+-------------------+-------------------+
only showing top 5 rows



Normalize strings

In [25]:
#Before
df.select("request_address").show(5,50)

+-------------------------------------+
|                      request_address|
+-------------------------------------+
| 2315  EL PASO ST, San Antonio, 78207|
|  2215  GOLIAD RD, San Antonio, 78223|
|102  PALFREY ST W, San Antonio, 78223|
| 114  LA GARDE ST, San Antonio, 78223|
|734  CLEARVIEW DR, San Antonio, 78228|
+-------------------------------------+
only showing top 5 rows



In [26]:
#After
df = df.withColumn("request_address", trim(lower(df.request_address)))

df.select("request_address").show(5,50)

+-------------------------------------+
|                      request_address|
+-------------------------------------+
| 2315  el paso st, san antonio, 78207|
|  2215  goliad rd, san antonio, 78223|
|102  palfrey st w, san antonio, 78223|
| 114  la garde st, san antonio, 78223|
|734  clearview dr, san antonio, 78228|
+-------------------------------------+
only showing top 5 rows



In [27]:
df = df.withColumn("num_weeks_late", expr("num_days_late / 7"))

df.select("num_days_late", "num_weeks_late").show(5)

+-------------------+--------------------+
|      num_days_late|      num_weeks_late|
+-------------------+--------------------+
| -998.5087616000001|        -142.6441088|
|-2.0126041669999997|-0.28751488099999994|
|       -3.022337963|-0.43176256614285713|
|       -15.01148148| -2.1444973542857144|
|0.37216435200000003|         0.053166336|
+-------------------+--------------------+
only showing top 5 rows



In [28]:
#df = df.withColumn("council_district", col("council_district").cast("int"))

# '%03d' means at least 3 digits, pad with 0s
#
# In order to use the format_string function the way we are, we'll need to
# convert council_district back to an integer temporarily, but the final output
# will be a string.
df = df.withColumn("council_district",
                  format_string("%03d", col("council_district").cast("int")))

df.select("council_district").show(5)

+----------------+
|council_district|
+----------------+
|             005|
|             003|
|             003|
|             003|
|             007|
+----------------+
only showing top 5 rows



In [29]:
df.schema

StructType([StructField('case_id', IntegerType(), True), StructField('case_opened_date', TimestampType(), True), StructField('case_closed_date', TimestampType(), True), StructField('case_due_date', TimestampType(), True), StructField('case_late', BooleanType(), True), StructField('num_days_late', DoubleType(), True), StructField('case_closed', BooleanType(), True), StructField('dept_division', StringType(), True), StructField('service_request_type', StringType(), True), StructField('SLA_days', DoubleType(), True), StructField('case_status', StringType(), True), StructField('source_id', StringType(), True), StructField('request_address', StringType(), True), StructField('council_district', StringType(), False), StructField('num_weeks_late', DoubleType(), True)])

New Features

In [30]:
df.select("request_address").show(2, 50)

+------------------------------------+
|                     request_address|
+------------------------------------+
|2315  el paso st, san antonio, 78207|
| 2215  goliad rd, san antonio, 78223|
+------------------------------------+
only showing top 2 rows



In [31]:
# col: request_address
# regex pattern
# index:0, as default b'c we are requesting the last few digits "$"
df.select(regexp_extract("request_address", r"\d+$", 0)).show(2)

+----------------------------------------+
|regexp_extract(request_address, \d+$, 0)|
+----------------------------------------+
|                                   78207|
|                                   78223|
+----------------------------------------+
only showing top 2 rows



In [32]:
# \d+: greedy digit; $: end of line
df = df.withColumn("zipcode", regexp_extract("request_address", r"\d+$", 0))

df.select("zipcode").show(5)

+-------+
|zipcode|
+-------+
|  78207|
|  78223|
|  78223|
|  78223|
|  78228|
+-------+
only showing top 5 rows



In [33]:
df.show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 005                  
 num_weeks_late       | -142.6441088         
 zipcode              | 78207                
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 2018-01-01 00:46:00  
 case_closed_date     | 2018-01-03 08:11:00  
 case_due_date        | 2018-01-05

In [34]:
df = (
# calculates the number of days between the current timestamp & case_opened_date
    df.withColumn(
        "case_age", datediff(current_timestamp(), "case_opened_date")
    )
# Calculates the number of days between the case_closed_date & case_opened_date
    .withColumn(
        "days_to_closed", datediff("case_closed_date", "case_opened_date")
    )
# since case_closed is boolean, we can check if it's T/F;
#If the case is not closed, it takes the value from the "case_age" column
#Otherwise, it takes the value from the "days_to_closed" column    
    .withColumn(
        "case_lifetime",
            when(expr("! case_closed"), col("case_age")).otherwise(col("days_to_closed"))
    )
)

In [35]:
df.select(
    "case_closed",
    "case_opened_date",
    "case_closed_date",
    "case_age",
    "days_to_closed",
    "case_lifetime",
).where(expr("case_closed")).show(5)

df.select(
    "case_closed",
    "case_opened_date",
    "case_closed_date",
    "case_age",
    "days_to_closed",
    "case_lifetime",
).where(expr("! case_closed")).show(5)

+-----------+-------------------+-------------------+--------+--------------+-------------+
|case_closed|   case_opened_date|   case_closed_date|case_age|days_to_closed|case_lifetime|
+-----------+-------------------+-------------------+--------+--------------+-------------+
|       true|2018-01-01 00:42:00|2018-01-01 12:29:00|    2060|             0|            0|
|       true|2018-01-01 00:46:00|2018-01-03 08:11:00|    2060|             2|            2|
|       true|2018-01-01 00:48:00|2018-01-02 07:57:00|    2060|             1|            1|
|       true|2018-01-01 01:29:00|2018-01-02 08:13:00|    2060|             1|            1|
|       true|2018-01-01 01:34:00|2018-01-01 13:29:00|    2060|             0|            0|
+-----------+-------------------+-------------------+--------+--------------+-------------+
only showing top 5 rows

+-----------+-------------------+----------------+--------+--------------+-------------+
|case_closed|   case_opened_date|case_closed_date|case_age

In [48]:
import sys
import os

home_directory_path = os.path.expanduser('~')
sys.path.append(home_directory_path +'/utils')
from env import host, username, password


def get_connection_url(db, user=username, host=host, password=password):
    '''
    This function uses my info from my env file to
    create a connection url to access the Codeup db.
    '''
    return f'mysql+pymysql://{user}:{password}@{host}/{db}'


ImportError: cannot import name 'username' from 'env' (/Users/martinreyes/utils/env.py)

In [44]:
url = get_connection_url("311_data")

In [45]:
query = """SELECT * FROM dept"""
dept = pd.read_sql(query, url)
dept = spark.createDataFrame(dept)
dept.show(4)

OperationalError: (pymysql.err.OperationalError) (2003, "Can't connect to MySQL server on 'None' ([Errno 8] nodename nor servname provided, or not known)")
(Background on this error at: https://sqlalche.me/e/14/e3q8)

In [None]:
df = (
    df
    # left join on dept_division
    .join(dept, "dept_division", "left")
    # drop all the columns except for standardized name, as it has much fewer unique values
    .drop(dept.dept_division)
    .drop(dept.dept_name)
    .drop(df.dept_division)
    .withColumnRenamed("standardized_dept_name", "department")
    # convert last col to a boolean
    .withColumn("dept_subject_to_SLA", col("dept_subject_to_SLA")=="YES")
)

df.show(2, vertical=True)

**Train-Test Split**

In [None]:
train, test = df.randomSplit([0.8, 0.2])
train.count(), test.count()

In [None]:
# Add Validate
train, validate, test = df.randomSplit([0.6, 0.2, 0.2], seed=321)
train.count(), validate.count(), test.count()

In [None]:
train.show(2, vertical=True)